Skip to content

Commit

Permalink
Merge branch 'master' into vmui/query-history-storage
Browse files Browse the repository at this point in the history
# Conflicts:
#	docs/CHANGELOG.md
  • Loading branch information
Loori-R committed Oct 2, 2023
2 parents 8591aa6 + da9ef90 commit 144f2d7
Show file tree
Hide file tree
Showing 84 changed files with 1,469 additions and 795 deletions.
46 changes: 37 additions & 9 deletions README.md
Expand Up @@ -110,6 +110,7 @@ Case studies:
* [Brandwatch](https://docs.victoriametrics.com/CaseStudies.html#brandwatch)
* [CERN](https://docs.victoriametrics.com/CaseStudies.html#cern)
* [COLOPL](https://docs.victoriametrics.com/CaseStudies.html#colopl)
* [Criteo](https://docs.victoriametrics.com/CaseStudies.html#criteo)
* [Dig Security](https://docs.victoriametrics.com/CaseStudies.html#dig-security)
* [Fly.io](https://docs.victoriametrics.com/CaseStudies.html#flyio)
* [German Research Center for Artificial Intelligence](https://docs.victoriametrics.com/CaseStudies.html#german-research-center-for-artificial-intelligence)
Expand Down Expand Up @@ -364,6 +365,8 @@ See the [example VMUI at VictoriaMetrics playground](https://play.victoriametric
* queries with the biggest average execution duration;
* queries that took the most summary time for execution.

This information is obtained from the `/api/v1/status/top_queries` HTTP endpoint.

## Active queries

[VMUI](#vmui) provides `active queries` tab, which shows currently execute queries.
Expand All @@ -373,6 +376,8 @@ It provides the following information per each query:
- The duration of the query execution.
- The client address, who initiated the query execution.

This information is obtained from the `/api/v1/status/active_queries` HTTP endpoint.

## Metrics explorer

[VMUI](#vmui) provides an ability to explore metrics exported by a particular `job` / `instance` in the following way:
Expand Down Expand Up @@ -404,14 +409,16 @@ matching the specified [series selector](https://prometheus.io/docs/prometheus/l

Cardinality explorer is built on top of [/api/v1/status/tsdb](#tsdb-stats).

See [cardinality explorer playground](https://play.victoriametrics.com/select/accounting/1/6a716b0f-38bc-4856-90ce-448fd713e3fe/prometheus/graph/#/cardinality).
See the example of using the cardinality explorer [here](https://victoriametrics.com/blog/cardinality-explorer/).

## Cardinality explorer statistic inaccuracy

In [cluster version of VictoriaMetrics](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html) each vmstorage tracks the stored time series individually.
vmselect requests stats via [/api/v1/status/tsdb](#tsdb-stats) API from each vmstorage node and merges the results by summing per-series stats.
This may lead to inflated values when samples for the same time series are spread across multiple vmstorage nodes
due to [replication](#replication) or [rerouting](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html?highlight=re-routes#cluster-availability).

See [cardinality explorer playground](https://play.victoriametrics.com/select/accounting/1/6a716b0f-38bc-4856-90ce-448fd713e3fe/prometheus/graph/#/cardinality).
See the example of using the cardinality explorer [here](https://victoriametrics.com/blog/cardinality-explorer/).

## How to apply new config to VictoriaMetrics

VictoriaMetrics is configured via command-line flags, so it must be restarted when new command-line flags should be applied:
Expand Down Expand Up @@ -616,6 +623,28 @@ Some plugins for Telegraf such as [fluentd](https://github.com/fangli/fluent-plu
or [Juniper/jitmon](https://github.com/Juniper/jtimon) send `SHOW DATABASES` query to `/query` and expect a particular database name in the response.
Comma-separated list of expected databases can be passed to VictoriaMetrics via `-influx.databaseNames` command-line flag.

### How to send data in InfluxDB v2 format

VictoriaMetrics exposes endpoint for InfluxDB v2 HTTP API at `/influx/api/v2/write` and `/api/v2/write`.


In order to write data with InfluxDB line protocol to local VictoriaMetrics using `curl`:

<div class="with-copy" markdown="1">

```console
curl -d 'measurement,tag1=value1,tag2=value2 field1=123,field2=1.23' -X POST 'http://localhost:8428/api/v2/write'
```

</div>

The `/api/v1/export` endpoint should return the following response:

```json
{"metric":{"__name__":"measurement_field1","tag1":"value1","tag2":"value2"},"values":[123],"timestamps":[1695902762311]}
{"metric":{"__name__":"measurement_field2","tag1":"value1","tag2":"value2"},"values":[1.23],"timestamps":[1695902762311]}
```

## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd)

Enable Graphite receiver in VictoriaMetrics by setting `-graphiteListenAddr` command line flag. For instance,
Expand Down Expand Up @@ -830,7 +859,7 @@ Additionally, VictoriaMetrics provides the following handlers:
* `/api/v1/series/count` - returns the total number of time series in the database. Some notes:
* the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series;
* the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions;
* `/api/v1/status/active_queries` - returns a list of currently running queries.
* `/api/v1/status/active_queries` - returns the list of currently running queries. This list is also available at [`active queries` page at VMUI](#active-queries).
* `/api/v1/status/top_queries` - returns the following query lists:
* the most frequently executed queries - `topByCount`
* queries with the biggest average execution duration - `topByAvgDuration`
Expand All @@ -840,6 +869,8 @@ Additionally, VictoriaMetrics provides the following handlers:
For example, request to `/api/v1/status/top_queries?topN=5&maxLifetime=30s` would return up to 5 queries per list, which were executed during the last 30 seconds.
VictoriaMetrics tracks the last `-search.queryStats.lastQueriesCount` queries with durations at least `-search.queryStats.minQueryDuration`.

See also [`top queries` page at VMUI](#top-queries).

### Timestamp formats

VictoriaMetrics accepts the following formats for `time`, `start` and `end` query args
Expand Down Expand Up @@ -1790,9 +1821,9 @@ Graphs on the dashboards contain useful hints - hover the `i` icon in the top le
We recommend setting up [alerts](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#alerts)
via [vmalert](https://docs.victoriametrics.com/vmalert.html) or via Prometheus.

VictoriaMetrics exposes currently running queries and their execution times at `/api/v1/status/active_queries` page.
VictoriaMetrics exposes currently running queries and their execution times at [`active queries` page](#active-queries).

VictoriaMetrics exposes queries, which take the most time to execute, at `/api/v1/status/top_queries` page.
VictoriaMetrics exposes queries, which take the most time to execute, at [`top queries` page](#top-queries).

See also [VictoriaMetrics Monitoring](https://victoriametrics.com/blog/victoriametrics-monitoring/)
and [troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html).
Expand Down Expand Up @@ -1937,9 +1968,6 @@ and [cardinality explorer docs](#cardinality-explorer).
has at least 20% of free space. The remaining amount of free space
can be [monitored](#monitoring) via `vm_free_disk_space_bytes` metric. The total size of data
stored on the disk can be monitored via sum of `vm_data_size_bytes` metrics.
See also `vm_merge_need_free_disk_space` metrics, which are set to values higher than 0
if background merge cannot be initiated due to free disk space shortage. The value shows the number of per-month partitions,
which would start background merge if they had more free disk space.

* VictoriaMetrics buffers incoming data in memory for up to a few seconds before flushing it to persistent storage.
This may lead to the following "issues":
Expand Down
18 changes: 13 additions & 5 deletions app/vlinsert/elasticsearch/elasticsearch.go
Expand Up @@ -12,6 +12,8 @@ import (
"strings"
"time"

"github.com/VictoriaMetrics/metrics"

"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
Expand All @@ -22,7 +24,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)

var (
Expand Down Expand Up @@ -101,8 +102,11 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
logger.Warnf("cannot decode log message #%d in /_bulk request: %s", n, err)
return true
}
vlstorage.MustAddRows(lr)
err = vlstorage.AddRows(lr)
logstorage.PutLogRows(lr)
if err != nil {
httpserver.Errorf(w, r, "cannot insert rows: %s", err)
}

tookMs := time.Since(startTime).Milliseconds()
bw := bufferedwriter.Get(w)
Expand All @@ -128,7 +132,7 @@ var (
)

func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
processLogMessage func(timestamp int64, fields []logstorage.Field),
processLogMessage func(timestamp int64, fields []logstorage.Field) error,
) (int, error) {
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

Expand Down Expand Up @@ -171,7 +175,7 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
var lineBufferPool bytesutil.ByteBufferPool

func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
processLogMessage func(timestamp int64, fields []logstorage.Field),
processLogMessage func(timestamp int64, fields []logstorage.Field) error,
) (bool, error) {
var line []byte

Expand Down Expand Up @@ -218,8 +222,12 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
ts = time.Now().UnixNano()
}
p.RenameField(msgField, "_msg")
processLogMessage(ts, p.Fields)
err = processLogMessage(ts, p.Fields)
logjson.PutParser(p)
if err != nil {
return false, err
}

return true, nil
}

Expand Down
6 changes: 4 additions & 2 deletions app/vlinsert/elasticsearch/elasticsearch_test.go
Expand Up @@ -15,8 +15,9 @@ func TestReadBulkRequestFailure(t *testing.T) {
f := func(data string) {
t.Helper()

processLogMessage := func(timestamp int64, fields []logstorage.Field) {
processLogMessage := func(timestamp int64, fields []logstorage.Field) error {
t.Fatalf("unexpected call to processLogMessage with timestamp=%d, fields=%s", timestamp, fields)
return nil
}

r := bytes.NewBufferString(data)
Expand All @@ -43,7 +44,7 @@ func TestReadBulkRequestSuccess(t *testing.T) {

var timestamps []int64
var result string
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
processLogMessage := func(timestamp int64, fields []logstorage.Field) error {
timestamps = append(timestamps, timestamp)

a := make([]string, len(fields))
Expand All @@ -52,6 +53,7 @@ func TestReadBulkRequestSuccess(t *testing.T) {
}
s := "{" + strings.Join(a, ",") + "}\n"
result += s
return nil
}

// Read the request without compression
Expand Down
2 changes: 1 addition & 1 deletion app/vlinsert/elasticsearch/elasticsearch_timing_test.go
Expand Up @@ -33,7 +33,7 @@ func benchmarkReadBulkRequest(b *testing.B, isGzip bool) {

timeField := "@timestamp"
msgField := "message"
processLogMessage := func(timestmap int64, fields []logstorage.Field) {}
processLogMessage := func(timestmap int64, fields []logstorage.Field) error { return nil }

b.ReportAllocs()
b.SetBytes(int64(len(data)))
Expand Down
12 changes: 7 additions & 5 deletions app/vlinsert/insertutils/common_params.go
Expand Up @@ -72,13 +72,13 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
}

// GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr.
func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) {
return func(timestamp int64, fields []logstorage.Field) {
func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) error {
return func(timestamp int64, fields []logstorage.Field) error {
if len(fields) > *MaxFieldsPerLine {
rf := logstorage.RowFormatter(fields)
logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf)
rowsDroppedTotalTooManyFields.Inc()
return
return nil
}

lr.MustAdd(cp.TenantID, timestamp, fields)
Expand All @@ -87,12 +87,14 @@ func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(ti
lr.ResetKeepSettings()
logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", cp.DebugRemoteAddr, cp.DebugRequestURI, s)
rowsDroppedTotalDebug.Inc()
return
return nil
}
if lr.NeedFlush() {
vlstorage.MustAddRows(lr)
err := vlstorage.AddRows(lr)
lr.ResetKeepSettings()
return err
}
return nil
}
}

Expand Down
14 changes: 11 additions & 3 deletions app/vlinsert/jsonline/jsonline.go
Expand Up @@ -75,8 +75,12 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
rowsIngestedTotal.Inc()
}

vlstorage.MustAddRows(lr)
err = vlstorage.AddRows(lr)
logstorage.PutLogRows(lr)
if err != nil {
httpserver.Errorf(w, r, "cannot insert rows: %s", err)
return true
}

// update jsonlineRequestDuration only for successfully parsed requests.
// There is no need in updating jsonlineRequestDuration for request errors,
Expand All @@ -86,7 +90,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}

func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) {
func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field) error) (bool, error) {
var line []byte
for len(line) == 0 {
if !sc.Scan() {
Expand All @@ -113,8 +117,12 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f
ts = time.Now().UnixNano()
}
p.RenameField(msgField, "_msg")
processLogMessage(ts, p.Fields)
err = processLogMessage(ts, p.Fields)
logjson.PutParser(p)
if err != nil {
return false, err
}

return true, nil
}

Expand Down
4 changes: 3 additions & 1 deletion app/vlinsert/jsonline/jsonline_test.go
Expand Up @@ -16,7 +16,7 @@ func TestReadBulkRequestSuccess(t *testing.T) {

var timestamps []int64
var result string
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
processLogMessage := func(timestamp int64, fields []logstorage.Field) error {
timestamps = append(timestamps, timestamp)

a := make([]string, len(fields))
Expand All @@ -25,6 +25,8 @@ func TestReadBulkRequestSuccess(t *testing.T) {
}
s := "{" + strings.Join(a, ",") + "}\n"
result += s

return nil
}

// Read the request without compression
Expand Down
17 changes: 13 additions & 4 deletions app/vlinsert/loki/loki_json.go
Expand Up @@ -50,12 +50,18 @@ func handleJSON(r *http.Request, w http.ResponseWriter) bool {
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
processLogMessage := cp.GetProcessLogMessageFunc(lr)
n, err := parseJSONRequest(data, processLogMessage)
vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr)
if err != nil {
logstorage.PutLogRows(lr)
httpserver.Errorf(w, r, "cannot parse Loki request: %s", err)
return true
}

err = vlstorage.AddRows(lr)
logstorage.PutLogRows(lr)
if err != nil {
httpserver.Errorf(w, r, "cannot insert rows: %s", err)
return true
}
rowsIngestedJSONTotal.Add(n)

// update lokiRequestJSONDuration only for successfully parsed requests
Expand All @@ -72,7 +78,7 @@ var (
lokiRequestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`)
)

func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)error) (int, error) {
p := parserPool.Get()
defer parserPool.Put(p)
v, err := p.ParseBytes(data)
Expand Down Expand Up @@ -165,7 +171,10 @@ func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, field
Name: "_msg",
Value: bytesutil.ToUnsafeString(msg),
})
processLogMessage(ts, fields)
err = processLogMessage(ts, fields)
if err != nil {
return rowsIngested, err
}

}
rowsIngested += len(lines)
Expand Down
6 changes: 4 additions & 2 deletions app/vlinsert/loki/loki_json_test.go
Expand Up @@ -11,8 +11,9 @@ import (
func TestParseJSONRequestFailure(t *testing.T) {
f := func(s string) {
t.Helper()
n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) error {
t.Fatalf("unexpected call to parseJSONRequest callback!")
return nil
})
if err == nil {
t.Fatalf("expecting non-nil error")
Expand Down Expand Up @@ -60,13 +61,14 @@ func TestParseJSONRequestSuccess(t *testing.T) {
f := func(s string, resultExpected string) {
t.Helper()
var lines []string
n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) error {
var a []string
for _, f := range fields {
a = append(a, f.String())
}
line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " "))
lines = append(lines, line)
return nil
})
if err != nil {
t.Fatalf("unexpected error: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion app/vlinsert/loki/loki_json_timing_test.go
Expand Up @@ -27,7 +27,7 @@ func benchmarkParseJSONRequest(b *testing.B, streams, rows, labels int) {
b.RunParallel(func(pb *testing.PB) {
data := getJSONBody(streams, rows, labels)
for pb.Next() {
_, err := parseJSONRequest(data, func(timestamp int64, fields []logstorage.Field) {})
_, err := parseJSONRequest(data, func(timestamp int64, fields []logstorage.Field) error { return nil })
if err != nil {
panic(fmt.Errorf("unexpected error: %s", err))
}
Expand Down

0 comments on commit 144f2d7

Please sign in to comment.