Skip to content

Commit

Permalink
lib/protoparser: adds opentelemetry parser
Browse files Browse the repository at this point in the history
app/{vmagent,vminsert}: adds opentelemetry ingestion path

Adds ability to ingest data with opentelemetry protocol
protobuf and json encoding is supported
data converted into prometheus protobuf timeseries
each data type has own converter and it may produce multiple timeseries
from single datapoint (for summary and histogram).
only cumulative aggregationFamily is supported for sum(prometheus
counter) and histogram.

Apply suggestions from code review

Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
  • Loading branch information
f41gh7 and hagen1778 committed Jun 2, 2022
1 parent a18914a commit 88c4b16
Show file tree
Hide file tree
Showing 162 changed files with 72,985 additions and 9 deletions.
11 changes: 11 additions & 0 deletions README.md
Expand Up @@ -59,6 +59,7 @@ VictoriaMetrics has the following prominent features:
* [JSON line format](#how-to-import-data-in-json-line-format).
* [Arbitrary CSV data](#how-to-import-csv-data).
* [Native binary format](#how-to-import-data-in-native-format).
* [OpenTelemetry format](#sending-data-via-opentelemetry-http).
* It supports metrics [relabeling](#relabeling).
* It can deal with [high cardinality issues](https://docs.victoriametrics.com/FAQ.html#what-is-high-cardinality) and [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) issues via [series limiter](#cardinality-limiter).
* It ideally works with big amounts of time series data from APM, Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various [Enterprise workloads](https://victoriametrics.com/products/enterprise/).
Expand Down Expand Up @@ -1051,6 +1052,16 @@ Note that it could be required to flush response cache after importing historica

VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).

## Sending data via opentelemetry http

VictoriaMetrics supports data ingestion via [opentelemetry protocol](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#opentelemetry-protocol-data-model) with `protobuf` and `json` encoding
via `/opentemetry/api/v1/push` path. For example, following command ingests a single gauge metric:
```bash
curl -XPOST -H 'Content-Type: application/json' localhost:8428/opentelemetry/api/v1/push -g -d '{"resourceMetrics":[{"resource":{"attributes":[{"key":"job", "value":{"stringValue":"vm"}}]}, "scopeMetrics":[{"metrics":[{"name":"my-gauge", "gauge":{"dataPoints":[{"attributes":[{"key":"label1", "value":{"stringValue":"value1"}}], "timeUnixNano":"15000000000", "asInt":"15"}]}}]}]}]}'
```
By default, VictoriaMetrics expects `protobuf`-encoded requests. For sending `json`-encoded requests set HTTP header `Content-Type: application/json`.
VictoriaMetrics accepts data with gzip compression, set HTTP header `Content-Encoding: gzip` for compressed data.

## Relabeling

VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points
Expand Down
31 changes: 24 additions & 7 deletions app/victoria-metrics/main_test.go
Expand Up @@ -43,6 +43,7 @@ const (
testOpenTSDBWriteHTTPPath = "http://127.0.0.1" + testOpenTSDBHTTPListenAddr + "/api/put"
testPromWriteHTTPPath = "http://127.0.0.1" + testHTTPListenAddr + "/api/v1/write"
testHealthHTTPPath = "http://127.0.0.1" + testHTTPListenAddr + "/health"
testOTLPWriteHTTPPath = "http://127.0.0.1" + testHTTPListenAddr + "/opentelemetry/api/v1/push"
)

const (
Expand Down Expand Up @@ -209,7 +210,7 @@ func testWrite(t *testing.T) {
t.Errorf("error compressing %v %s", r, err)
t.Fail()
}
httpWrite(t, testPromWriteHTTPPath, test.InsertQuery, bytes.NewBuffer(data))
httpWrite(t, testPromWriteHTTPPath, test.InsertQuery, bytes.NewBuffer(data), 204)
}
})

Expand All @@ -218,7 +219,7 @@ func testWrite(t *testing.T) {
test := x
t.Run(test.Name, func(t *testing.T) {
t.Parallel()
httpWrite(t, testWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n")))
httpWrite(t, testWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n")), 204)
})
}
})
Expand Down Expand Up @@ -246,14 +247,25 @@ func testWrite(t *testing.T) {
t.Run(test.Name, func(t *testing.T) {
t.Parallel()
logger.Infof("writing %s", test.Data)
httpWrite(t, testOpenTSDBWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n")))
httpWrite(t, testOpenTSDBWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n")), 204)
})
}
})
t.Run("opentelemetry", func(t *testing.T) {
for _, x := range readIn("opentelemetry", t, insertionTime) {
test := x
t.Run(test.Name, func(t *testing.T) {
t.Parallel()
httpWrite(t, testOTLPWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n")), 200, func(r *http.Request) {
r.Header.Set("Content-Type", "application/json")
})
})
}
})
}

func testRead(t *testing.T) {
for _, engine := range []string{"prometheus", "graphite", "opentsdb", "influxdb", "opentsdbhttp"} {
for _, engine := range []string{"prometheus", "graphite", "opentsdb", "influxdb", "opentsdbhttp", "opentelemetry"} {
t.Run(engine, func(t *testing.T) {
for _, x := range readIn(engine, t, insertionTime) {
test := x
Expand Down Expand Up @@ -324,13 +336,18 @@ func readIn(readFor string, t *testing.T, insertTime time.Time) []test {
return tt
}

func httpWrite(t *testing.T, address, query string, r io.Reader) {
func httpWrite(t *testing.T, address, query string, r io.Reader, wantCode int, reqOptions ...func(r *http.Request)) {
t.Helper()
s := newSuite(t)
resp, err := http.Post(address+query, "", r)
req, err := http.NewRequest("POST", address+query, r)
s.noError(err)
for _, reqOption := range reqOptions {
reqOption(req)
}
resp, err := http.DefaultClient.Do(req)
s.noError(err)
s.noError(resp.Body.Close())
s.equalInt(resp.StatusCode, 204)
s.equalInt(resp.StatusCode, wantCode)
}

func tcpWrite(t *testing.T, address string, data string) {
Expand Down
21 changes: 21 additions & 0 deletions app/victoria-metrics/testdata/opentelemetry/basic.json
@@ -0,0 +1,21 @@
{
"name": "ingest all types",
"query": ["/api/v1/export?match={__name__=~'(my-gauge|my-histogram|my-sum|my-summary).*'}"],
"result_metrics": [
{"metric":{"__name__":"my-gauge","job":"vm","label1":"value1"},"values":[15], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-sum","job":"vm","label5":"value5"},"values":[15.5], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-summary","job":"vm","label6":"value6","quantile": "0.1"},"values":[7.5], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-summary","job":"vm","label6":"value6","quantile": "1"},"values":[15], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-summary","job":"vm","label6":"value6","quantile": "0.5"},"values":[10], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-summary_count","job":"vm","label6":"value6"},"values":[5], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-summary_sum","job":"vm","label6":"value6"},"values":[32.5], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_sum","job":"vm","label2":"value2"},"values":[30], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_count","job":"vm","label2":"value2"},"values":[15], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_bucket","job":"vm","label2":"value2","le": "0.1"},"values":[0], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_bucket","job":"vm","label2":"value2","le": "0.5"},"values":[5], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_bucket","job":"vm","label2":"value2","le": "1"},"values":[15], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_bucket","job":"vm","label2":"value2","le": "5"},"values":[15], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_bucket","job":"vm","label2":"value2","le": "+Inf"},"values":[15], "timestamps": ["{TIME_MS}"]}
],
"data": ["{\"resourceMetrics\":[{\"resource\":{\"attributes\":[{\"key\":\"job\",\"value\":{\"stringValue\":\"vm\"}}]},\"scopeMetrics\":[{\"metrics\":[{\"name\":\"my-gauge\",\"gauge\":{\"dataPoints\":[{\"attributes\":[{\"key\":\"label1\",\"value\":{\"stringValue\":\"value1\"}}],\"timeUnixNano\":\"{TIME_NS}\",\"asInt\":\"15\"}]}},{\"name\":\"my-histogram\",\"histogram\":{\"dataPoints\":[{\"attributes\":[{\"key\":\"label2\",\"value\":{\"stringValue\":\"value2\"}}],\"timeUnixNano\":\"{TIME_NS}\",\"count\":\"15\",\"sum\":30,\"bucketCounts\":[\"0\",\"5\",\"10\",\"0\",\"0\"],\"explicitBounds\":[0.1,0.5,1,5]}],\"aggregationTemporality\":\"AGGREGATION_TEMPORALITY_CUMULATIVE\"}},{\"name\":\"my-sum\",\"sum\":{\"dataPoints\":[{\"attributes\":[{\"key\":\"label5\",\"value\":{\"stringValue\":\"value5\"}}],\"timeUnixNano\":\"{TIME_NS}\",\"asDouble\":15.5}],\"aggregationTemporality\":\"AGGREGATION_TEMPORALITY_CUMULATIVE\"}},{\"name\":\"my-summary\",\"summary\":{\"dataPoints\":[{\"attributes\":[{\"key\":\"label6\",\"value\":{\"stringValue\":\"value6\"}}],\"timeUnixNano\":\"{TIME_NS}\",\"count\":\"5\",\"sum\":32.5,\"quantileValues\":[{\"quantile\":0.1,\"value\":7.5},{\"quantile\":0.5,\"value\":10},{\"quantile\":1,\"value\":15}]}]}}]}]}]}"]
}
8 changes: 8 additions & 0 deletions app/vmagent/main.go
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentelemetryhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport"
Expand Down Expand Up @@ -267,6 +268,13 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{}`)
return true
case "/opentelemetry/api/v1/push":
if err := opentelemetryhttp.InsertHandler(nil, r); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusOK)
return true
case "/targets":
promscrapeTargetsRequests.Inc()
promscrape.WriteHumanReadableTargetsStatus(w, r)
Expand Down
83 changes: 83 additions & 0 deletions app/vmagent/opentelemetryhttp/request_handler.go
@@ -0,0 +1,83 @@
package opentelemetryhttp

import (
"net/http"

"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)

var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="opentelemetry"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="opentelemetry"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="opentelemetry"}`)
)

// InsertHandler processes opentelemetry metrics.
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
isJSON := req.Header.Get("Content-Type") == "application/json"
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req.Body, isJSON, isGzipped, func(tss []prompb.TimeSeries) error {
return insertRows(at, tss, extraLabels)
})
})
}

func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)

rowsTotal := 0
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
for i := range timeseries {
ts := &timeseries[i]
rowsTotal += len(ts.Samples)
labelsLen := len(labels)
for i := range ts.Labels {
label := &ts.Labels[i]
labels = append(labels, prompbmarshal.Label{
Name: bytesutil.ToUnsafeString(label.Name),
Value: bytesutil.ToUnsafeString(label.Value),
})
}
labels = append(labels, extraLabels...)
samplesLen := len(samples)
for i := range ts.Samples {
sample := &ts.Samples[i]
samples = append(samples, prompbmarshal.Sample{
Value: sample.Value,
Timestamp: sample.Timestamp,
})
}
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
}
rowsPerInsert.Update(float64(rowsTotal))
return nil
}
8 changes: 8 additions & 0 deletions app/vminsert/main.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentelemetryhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport"
Expand Down Expand Up @@ -192,6 +193,13 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{}`)
return true
case "/opentelemetry/api/v1/push":
if err := opentelemetryhttp.InsertHandler(r); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusOK)
return true
case "/prometheus/targets", "/targets":
promscrapeTargetsRequests.Inc()
promscrape.WriteHumanReadableTargetsStatus(w, r)
Expand Down
81 changes: 81 additions & 0 deletions app/vminsert/opentelemetryhttp/request_handler.go
@@ -0,0 +1,81 @@
package opentelemetryhttp

import (
"net/http"

"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)

var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="opentelemetry"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="opentelemetry"}`)
)

// InsertHandler processes opentelemetry metrics.
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
isJSON := req.Header.Get("Content-Type") == "application/json"
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req.Body, isJSON, isGzipped, func(tss []prompb.TimeSeries) error {
return insertRows(tss, extraLabels)
})
})
}

func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)

rowsLen := 0
for i := range timeseries {
rowsLen += len(timeseries[i].Samples)
}
ctx.Reset(rowsLen)
rowsTotal := 0
hasRelabeling := relabel.HasRelabeling()
for i := range timeseries {
ts := &timeseries[i]
rowsTotal += len(ts.Samples)
ctx.Labels = ctx.Labels[:0]
srcLabels := ts.Labels
for _, srcLabel := range srcLabels {
ctx.AddLabelBytes(srcLabel.Name, srcLabel.Value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
var metricNameRaw []byte
var err error
samples := ts.Samples
for i := range samples {
r := &samples[i]
metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value)
if err != nil {
return err
}
}
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ctx.FlushBufs()
}
1 change: 1 addition & 0 deletions docs/CHANGELOG.md
Expand Up @@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f

## tip

* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): Allows data ingestion with [opentelemetry](https://opentelemetry.io/docs/reference/specification/metrics/) protocol. Metrics can be pushed directly into VictoriaMetrics without [OTLP collector](https://opentelemetry.io/docs/collector/). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2424)
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): Allows using kubeconfig file within `kubernetes_sd_configs`. It may be useful for kubernetes cluster monitoring by `vmagent` outside kubernetes cluster. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1464).
* FEATURE: allow overriding default limits for in-memory cache `indexdb/tagFilters` via flag `-storage.cacheSizeIndexDBTagFilters`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2663).
* FEATURE: add support of `lowercase` and `uppercase` relabeling actions in the same way as [Prometheus 2.36.0 does](https://github.com/prometheus/prometheus/releases/tag/v2.36.0). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2664).
Expand Down
11 changes: 11 additions & 0 deletions docs/README.md
Expand Up @@ -59,6 +59,7 @@ VictoriaMetrics has the following prominent features:
* [JSON line format](#how-to-import-data-in-json-line-format).
* [Arbitrary CSV data](#how-to-import-csv-data).
* [Native binary format](#how-to-import-data-in-native-format).
* [OpenTelemetry format](#sending-data-via-opentelemetry-http).
* It supports metrics [relabeling](#relabeling).
* It can deal with [high cardinality issues](https://docs.victoriametrics.com/FAQ.html#what-is-high-cardinality) and [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) issues via [series limiter](#cardinality-limiter).
* It ideally works with big amounts of time series data from APM, Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various [Enterprise workloads](https://victoriametrics.com/products/enterprise/).
Expand Down Expand Up @@ -1051,6 +1052,16 @@ Note that it could be required to flush response cache after importing historica

VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).

## Sending data via opentelemetry http

VictoriaMetrics supports data ingestion via [opentelemetry protocol](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#opentelemetry-protocol-data-model) with `protobuf` and `json` encoding
via `/opentemetry/api/v1/push` path. For example, following command ingests a single gauge metric:
```bash
curl -XPOST -H 'Content-Type: application/json' localhost:8428/opentelemetry/api/v1/push -g -d '{"resourceMetrics":[{"resource":{"attributes":[{"key":"job", "value":{"stringValue":"vm"}}]}, "scopeMetrics":[{"metrics":[{"name":"my-gauge", "gauge":{"dataPoints":[{"attributes":[{"key":"label1", "value":{"stringValue":"value1"}}], "timeUnixNano":"15000000000", "asInt":"15"}]}}]}]}]}'
```
By default, VictoriaMetrics expects `protobuf`-encoded requests. For sending `json`-encoded requests set HTTP header `Content-Type: application/json`.
VictoriaMetrics accepts data with gzip compression, set HTTP header `Content-Encoding: gzip` for compressed data.

## Relabeling

VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points
Expand Down

0 comments on commit 88c4b16

Please sign in to comment.