Skip to content

Commit

Permalink
lib/protoparser: adds opentelemetry parser
Browse files Browse the repository at this point in the history
app/{vmagent,vminsert}: adds opentelemetry ingestion path

Adds ability to ingest data with opentelemetry protocol
protobuf and json encoding is supported
data converted into prometheus protobuf timeseries
each data type has own converter and it may produce multiple timeseries
from single datapoint (for summary and histogram).
only cumulative aggregationFamily is supported for sum(prometheus
counter) and histogram.

Apply suggestions from code review

Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>

updates deps

fixes tests

wip

wip

wip

wip

lib/protoparser/opentelemetry: moves to vtprotobuf generator

go mod vendor

lib/protoparse/opentelemetry: reduce memory allocations
  • Loading branch information
f41gh7 committed Jun 26, 2023
1 parent a97887a commit b3c955a
Show file tree
Hide file tree
Showing 31 changed files with 10,261 additions and 12 deletions.
64 changes: 64 additions & 0 deletions README.md
Expand Up @@ -86,6 +86,7 @@ VictoriaMetrics has the following prominent features:
* [Arbitrary CSV data](#how-to-import-csv-data).
* [Native binary format](#how-to-import-data-in-native-format).
* [DataDog agent or DogStatsD](#how-to-send-data-from-datadog-agent).
* [OpenTelemetry metrics format](#sending-data-via-opentelemetry).
* It supports powerful [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html), which can be used as a [statsd](https://github.com/statsd/statsd) alternative.
* It supports metrics [relabeling](#relabeling).
* It can deal with [high cardinality issues](https://docs.victoriametrics.com/FAQ.html#what-is-high-cardinality) and
Expand Down Expand Up @@ -1154,6 +1155,7 @@ Additionally, VictoriaMetrics can accept metrics via the following popular data
* DataDog `submit metrics` API. See [these docs](#how-to-send-data-from-datadog-agent) for details.
* InfluxDB line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
* Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
* OpenTelemetry http API. See [these docs](#sending-data-via-opentelemetry) for details.
* OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details.
* OpenTSDB http `/api/put` protocol. See [these docs](#sending-opentsdb-data-via-http-apiput-requests) for details.
* `/api/v1/import` for importing data obtained from [/api/v1/export](#how-to-export-data-in-json-line-format).
Expand Down Expand Up @@ -1336,6 +1338,68 @@ Note that it could be required to flush response cache after importing historica

VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).

## Sending data via OpenTelemetry

VictoriaMetrics supports data ingestion via [OpenTelemetry metrics protocol](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md)
with `protobuf` and `json` encoding via `/opentemetry/api/v1/push` path. For example, the following command stores `temperature{job="vm",label1="value1"} 15`
[metric](https://docs.victoriametrics.com/keyConcepts.html#what-is-a-metric) to VictoriaMetrics:

```bash
echo '{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "job",
"value": {
"stringValue": "vm"
}
}
]
},
"scopeMetrics": [
{
"metrics": [
{
"name": "temperature",
"gauge": {
"dataPoints": [
{
"attributes": [
{
"key": "label1",
"value": {
"stringValue": "value1"
}
}
],
"asInt": "15"
}
]
}
}
]
}
]
}
]
}
' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:8428/opentelemetry/api/v1/push
```

The saved data can be verified by querying it via [/api/v1/export](https://docs.victoriametrics.com/#how-to-export-data-in-json-line-format):

```bash
curl http://localhost:8428/api/v1/export -d 'match[]=temperature'
{"metric":{"__name__":"temperature","job":"vm","label1":"value1"},"values":[15],"timestamps":[1673390534000]}
```

By default VictoriaMetrics expects `protobuf`-encoded requests at `/opentelemetry/api/v1/push`. Set `Content-Type: application/json`
request header when sending `json`-encoded data.

Set HTTP request header `Content-Encoding: gzip` when sending gzip-compressed data to `/opentelemetry/api/v1/push`.

## Relabeling

VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points
Expand Down
31 changes: 24 additions & 7 deletions app/victoria-metrics/main_test.go
Expand Up @@ -42,6 +42,7 @@ const (
testOpenTSDBWriteHTTPPath = "http://127.0.0.1" + testOpenTSDBHTTPListenAddr + "/api/put"
testPromWriteHTTPPath = "http://127.0.0.1" + testHTTPListenAddr + "/api/v1/write"
testHealthHTTPPath = "http://127.0.0.1" + testHTTPListenAddr + "/health"
testOTLPWriteHTTPPath = "http://127.0.0.1" + testHTTPListenAddr + "/opentelemetry/api/v1/push"
)

const (
Expand Down Expand Up @@ -206,7 +207,7 @@ func testWrite(t *testing.T) {
t.Errorf("error compressing %v %s", r, err)
t.Fail()
}
httpWrite(t, testPromWriteHTTPPath, test.InsertQuery, bytes.NewBuffer(data))
httpWrite(t, testPromWriteHTTPPath, test.InsertQuery, bytes.NewBuffer(data), 204)
}
})

Expand All @@ -215,7 +216,7 @@ func testWrite(t *testing.T) {
test := x
t.Run(test.Name, func(t *testing.T) {
t.Parallel()
httpWrite(t, testWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n")))
httpWrite(t, testWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n")), 204)
})
}
})
Expand Down Expand Up @@ -243,14 +244,25 @@ func testWrite(t *testing.T) {
t.Run(test.Name, func(t *testing.T) {
t.Parallel()
logger.Infof("writing %s", test.Data)
httpWrite(t, testOpenTSDBWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n")))
httpWrite(t, testOpenTSDBWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n")), 204)
})
}
})
t.Run("opentelemetry", func(t *testing.T) {
for _, x := range readIn("opentelemetry", t, insertionTime) {
test := x
t.Run(test.Name, func(t *testing.T) {
t.Parallel()
httpWrite(t, testOTLPWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n")), 200, func(r *http.Request) {
r.Header.Set("Content-Type", "application/json")
})
})
}
})
}

func testRead(t *testing.T) {
for _, engine := range []string{"prometheus", "graphite", "opentsdb", "influxdb", "opentsdbhttp"} {
for _, engine := range []string{"prometheus", "graphite", "opentsdb", "influxdb", "opentsdbhttp", "opentelemetry"} {
t.Run(engine, func(t *testing.T) {
for _, x := range readIn(engine, t, insertionTime) {
test := x
Expand Down Expand Up @@ -321,13 +333,18 @@ func readIn(readFor string, t *testing.T, insertTime time.Time) []test {
return tt
}

func httpWrite(t *testing.T, address, query string, r io.Reader) {
func httpWrite(t *testing.T, address, query string, r io.Reader, wantCode int, reqOptions ...func(r *http.Request)) {
t.Helper()
s := newSuite(t)
resp, err := http.Post(address+query, "", r)
req, err := http.NewRequest("POST", address+query, r)
s.noError(err)
for _, reqOption := range reqOptions {
reqOption(req)
}
resp, err := http.DefaultClient.Do(req)
s.noError(err)
s.noError(resp.Body.Close())
s.equalInt(resp.StatusCode, 204)
s.equalInt(resp.StatusCode, wantCode)
}

func tcpWrite(t *testing.T, address string, data string) {
Expand Down
21 changes: 21 additions & 0 deletions app/victoria-metrics/testdata/opentelemetry/basic.json
@@ -0,0 +1,21 @@
{
"name": "ingest all types",
"query": ["/api/v1/export?match={__name__=~'(my-gauge|my-histogram|my-sum|my-summary).*'}"],
"result_metrics": [
{"metric":{"__name__":"my-gauge","job":"vm","label1":"value1"},"values":[15], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-sum","job":"vm","label5":"value5"},"values":[15.5], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-summary","job":"vm","label6":"value6","quantile": "0.1"},"values":[7.5], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-summary","job":"vm","label6":"value6","quantile": "1"},"values":[15], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-summary","job":"vm","label6":"value6","quantile": "0.5"},"values":[10], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-summary_count","job":"vm","label6":"value6"},"values":[5], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-summary_sum","job":"vm","label6":"value6"},"values":[32.5], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_sum","job":"vm","label2":"value2"},"values":[30], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_count","job":"vm","label2":"value2"},"values":[15], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_bucket","job":"vm","label2":"value2","le": "0.1"},"values":[0], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_bucket","job":"vm","label2":"value2","le": "0.5"},"values":[5], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_bucket","job":"vm","label2":"value2","le": "1"},"values":[15], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_bucket","job":"vm","label2":"value2","le": "5"},"values":[15], "timestamps": ["{TIME_MS}"]},
{"metric":{"__name__":"my-histogram_bucket","job":"vm","label2":"value2","le": "+Inf"},"values":[15], "timestamps": ["{TIME_MS}"]}
],
"data": ["{\"resourceMetrics\":[{\"resource\":{\"attributes\":[{\"key\":\"job\",\"value\":{\"stringValue\":\"vm\"}}]},\"scopeMetrics\":[{\"metrics\":[{\"name\":\"my-gauge\",\"gauge\":{\"dataPoints\":[{\"attributes\":[{\"key\":\"label1\",\"value\":{\"stringValue\":\"value1\"}}],\"timeUnixNano\":\"{TIME_NS}\",\"asInt\":\"15\"}]}},{\"name\":\"my-histogram\",\"histogram\":{\"dataPoints\":[{\"attributes\":[{\"key\":\"label2\",\"value\":{\"stringValue\":\"value2\"}}],\"timeUnixNano\":\"{TIME_NS}\",\"count\":\"15\",\"sum\":30,\"bucketCounts\":[\"0\",\"5\",\"10\",\"0\",\"0\"],\"explicitBounds\":[0.1,0.5,1,5]}],\"aggregationTemporality\":\"AGGREGATION_TEMPORALITY_CUMULATIVE\"}},{\"name\":\"my-sum\",\"sum\":{\"dataPoints\":[{\"attributes\":[{\"key\":\"label5\",\"value\":{\"stringValue\":\"value5\"}}],\"timeUnixNano\":\"{TIME_NS}\",\"asDouble\":15.5}],\"aggregationTemporality\":\"AGGREGATION_TEMPORALITY_CUMULATIVE\"}},{\"name\":\"my-summary\",\"summary\":{\"dataPoints\":[{\"attributes\":[{\"key\":\"label6\",\"value\":{\"stringValue\":\"value6\"}}],\"timeUnixNano\":\"{TIME_NS}\",\"count\":\"5\",\"sum\":32.5,\"quantileValues\":[{\"quantile\":0.1,\"value\":7.5},{\"quantile\":0.5,\"value\":10},{\"quantile\":1,\"value\":15}]}]}}]}]}]}"]
}
1 change: 1 addition & 0 deletions app/vmagent/README.md
Expand Up @@ -93,6 +93,7 @@ additionally to pull-based Prometheus-compatible targets' scraping:
* DataDog "submit metrics" API. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent).
* InfluxDB line protocol via `http://<vmagent>:8429/write`. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
* Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-graphite-compatible-agents-such-as-statsd).
* OpenTelemetry http API. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry).
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-opentsdb-compatible-agents).
* Prometheus remote write protocol via `http://<vmagent>:8429/api/v1/write`.
* JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-json-line-format).
Expand Down
13 changes: 13 additions & 0 deletions app/vmagent/main.go
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentelemetryhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport"
Expand Down Expand Up @@ -343,6 +344,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{}`)
return true
case "/opentelemetry/api/v1/push":
opentelemetryPushRequests.Inc()
if err := opentelemetryhttp.InsertHandler(nil, r); err != nil {
opentelemetryPushErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusOK)
return true
case "/prometheus/targets", "/targets":
promscrapeTargetsRequests.Inc()
promscrape.WriteHumanReadableTargetsStatus(w, r)
Expand Down Expand Up @@ -568,6 +578,9 @@ var (
datadogIntakeRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/intake", protocol="datadog"}`)
datadogMetadataRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/metadata", protocol="datadog"}`)

opentelemetryPushRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
opentelemetryPushErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)

promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`)
promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/service-discovery"}`)

Expand Down
80 changes: 80 additions & 0 deletions app/vmagent/opentelemetryhttp/request_handler.go
@@ -0,0 +1,80 @@
package opentelemetryhttp

import (
"net/http"

"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)

var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="opentelemetry"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="opentelemetry"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="opentelemetry"}`)
)

// InsertHandler processes opentelemetry metrics.
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
isJSON := req.Header.Get("Content-Type") == "application/json"
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isJSON, isGzipped, func(tss []prompb.TimeSeries) error {
return insertRows(at, tss, extraLabels)
})
}

func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)

rowsTotal := 0
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
for i := range timeseries {
ts := &timeseries[i]
rowsTotal += len(ts.Samples)
labelsLen := len(labels)
for i := range ts.Labels {
label := &ts.Labels[i]
labels = append(labels, prompbmarshal.Label{
Name: bytesutil.ToUnsafeString(label.Name),
Value: bytesutil.ToUnsafeString(label.Value),
})
}
labels = append(labels, extraLabels...)
samplesLen := len(samples)
for i := range ts.Samples {
sample := &ts.Samples[i]
samples = append(samples, prompbmarshal.Sample{
Value: sample.Value,
Timestamp: sample.Timestamp,
})
}
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
}
rowsPerInsert.Update(float64(rowsTotal))
return nil
}
13 changes: 13 additions & 0 deletions app/vminsert/main.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentelemetryhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport"
Expand Down Expand Up @@ -245,6 +246,15 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{}`)
return true
case "/opentelemetry/api/v1/push":
opentelemetryPushRequests.Inc()
if err := opentelemetryhttp.InsertHandler(r); err != nil {
opentelemetryPushErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusOK)
return true
case "/prometheus/targets", "/targets":
promscrapeTargetsRequests.Inc()
promscrape.WriteHumanReadableTargetsStatus(w, r)
Expand Down Expand Up @@ -344,6 +354,9 @@ var (
datadogIntakeRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/intake", protocol="datadog"}`)
datadogMetadataRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/metadata", protocol="datadog"}`)

opentelemetryPushRequests = metrics.NewCounter(`vm_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
opentelemetryPushErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)

promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`)
promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vm_http_requests_total{path="/service-discovery"}`)

Expand Down

0 comments on commit b3c955a

Please sign in to comment.