Skip to content

Commit

Permalink
vlinsert: added opentelemetry logs support
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewChubatiuk committed May 4, 2024
1 parent c0050be commit fa7ca96
Show file tree
Hide file tree
Showing 14 changed files with 807 additions and 303 deletions.
4 changes: 4 additions & 0 deletions app/vlinsert/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/opentelemetry"
)

// Init initializes vlinsert
Expand Down Expand Up @@ -37,6 +38,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
case strings.HasPrefix(path, "/loki/"):
path = strings.TrimPrefix(path, "/loki")
return loki.RequestHandler(path, w, r)
case strings.HasPrefix(path, "/opentelemetry/"):
path = strings.TrimPrefix(path, "/opentelemetry")
return opentelemetry.RequestHandler(path, w, r)
default:
return false
}
Expand Down
80 changes: 80 additions & 0 deletions app/vlinsert/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package opentelemetry

import (
"fmt"
"net/http"
"time"

"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
opentelemetry "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream"
"github.com/VictoriaMetrics/metrics"
)

// RequestHandler processes Opentelemetry insert requests
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
switch path {
case "/api/v1/push":
return handleInsert(r, w)
default:
return false
}
}

func handleInsert(r *http.Request, w http.ResponseWriter) bool {
startTime := time.Now()
var m *otelMetrics
contentType := r.Header.Get("Content-Type")
if contentType == "application/json" {
m = jsonMetrics
} else {
m = protobufMetrics
}
isGzipped := r.Header.Get("Content-Encoding") == "gzip"
m.requestsTotal.Inc()
cp, err := insertutils.GetCommonParams(r)
if err != nil {
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
return true
}
n, err := opentelemetry.ParseLogsStream(r.Body, contentType, isGzipped, func(streamFields []string) (func(int64, []logstorage.Field), func()) {
lr := logstorage.GetLogRows(streamFields, nil)
processLogFn := cp.GetProcessLogMessageFunc(lr)
pushFn := func() {
vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr)
}
return processLogFn, pushFn
})
if err != nil {
httpserver.Errorf(w, r, "cannot parse Opentelemetry request: %s", err)
return true
}

m.ingestedTotal.Add(n)

m.requestsDuration.UpdateDuration(startTime)

return true
}

type otelMetrics struct {
requestsTotal *metrics.Counter
ingestedTotal *metrics.Counter
requestsDuration *metrics.Histogram
}

func newMetrics(format string) *otelMetrics {
return &otelMetrics{
requestsTotal: metrics.NewCounter(fmt.Sprintf(`vl_http_requests_total{path="/insert/opentelemetry/api/v1/push",format="%s"}`, format)),
ingestedTotal: metrics.NewCounter(fmt.Sprintf(`vl_rows_ingested_total{type="opentelemetry",format="%s"}`, format)),
requestsDuration: metrics.NewHistogram(fmt.Sprintf(`vl_http_request_duration_seconds{path="/insert/opentelemetry/api/v1/push",format="%s"}`, format)),
}
}

var (
jsonMetrics = newMetrics("json")
protobufMetrics = newMetrics("protobuf")
)
9 changes: 4 additions & 5 deletions app/vmagent/opentelemetry/request_handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package opentelemetry

import (
"fmt"
"net/http"

"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
Expand Down Expand Up @@ -29,14 +28,14 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
var processBody func([]byte) ([]byte, error)
if req.Header.Get("Content-Type") == "application/json" {
contentType := req.Header.Get("Content-Type")
if contentType == "application/json" {
if req.Header.Get("X-Amz-Firehose-Protocol-Version") != "" {
processBody = firehose.ProcessRequestBody
} else {
return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
contentType = "application/x-protobuf"
}
}
return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
return stream.ParseMetricsStream(req.Body, contentType, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
return insertRows(at, tss, extraLabels)
})
}
Expand Down
9 changes: 4 additions & 5 deletions app/vminsert/opentelemetry/request_handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package opentelemetry

import (
"fmt"
"net/http"

"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
Expand All @@ -26,14 +25,14 @@ func InsertHandler(req *http.Request) error {
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
var processBody func([]byte) ([]byte, error)
if req.Header.Get("Content-Type") == "application/json" {
contentType := req.Header.Get("Content-Type")
if contentType == "application/json" {
if req.Header.Get("X-Amz-Firehose-Protocol-Version") != "" {
processBody = firehose.ProcessRequestBody
} else {
return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
contentType = "application/x-protobuf"
}
}
return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
return stream.ParseMetricsStream(req.Body, contentType, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
return insertRows(tss, extraLabels)
})
}
Expand Down
1 change: 1 addition & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).

## tip

* FEATURE: [vlinsert](https://docs.victoriametrics.com/victorialogs/): added OpenTelemetry logs ingestion support.
* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): support selecting of multiple instances on the dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5869) for details.
* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): properly display version in the Stats row for the custom builds of VictoriaMetrics.
* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): add `Network Usage` panel to `Resource Usage` row.
Expand Down
37 changes: 37 additions & 0 deletions docs/VictoriaLogs/data-ingestion/OpenTelemetry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
---
weight: 4
title: OpenTelemetry setup
disableToc: true
menu:
docs:
parent: "victorialogs-data-ingestion"
weight: 4
aliases:
- /VictoriaLogs/data-ingestion/OpenTelemetry.html
---
# OpenTelemetry setup

Specify logs endpoint for [OTLP/HTTP exporter](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/otlphttpexporter/README.md) in configuration file
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/):

```yaml
exporters:
otlphttp:
logs_endpoint: http://localhost:9428/insert/opentelemetry/api/v1/push
```

Substitute `localhost:9428` address inside `exporters.oltphttp.logs_endpoint` with the real TCP address of VictoriaLogs.

VictoriaLogs divides all the ingested logs into a log streams [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) relying on resource attributes. In example below resource attributes are set for [filelog OpenTelemetry receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/filelogreceiver):

```yaml
receivers:
filelog:
include: [/var/log/myservice/*.json]
resource:
region: us-east-1
```

The ingested log entries can be queried according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/).

See also [data ingestion troubleshooting](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#troubleshooting) docs.
2 changes: 1 addition & 1 deletion lib/protoparser/opentelemetry/firehose/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestProcessRequestBody(t *testing.T) {
{__name__="amazonaws.com/AWS/EBS/VolumeReadOps",cloud.provider="aws",cloud.account.id="677435890598",cloud.region="us-east-1",aws.exporter.arn="arn:aws:cloudwatch:us-east-1:677435890598:metric-stream/custom_ebs_metric",quantile="1"} 0 1709217300000
`
var callbackCalls atomic.Uint64
err := stream.ParseStream(bytes.NewReader(data), false, ProcessRequestBody, func(tss []prompbmarshal.TimeSeries) error {
err := stream.ParseMetricsStream(bytes.NewReader(data), "application/x-protobuf", false, ProcessRequestBody, func(tss []prompbmarshal.TimeSeries) error {
callbackCalls.Add(1)
s := formatTimeseries(tss)
if s != sExpected {
Expand Down

0 comments on commit fa7ca96

Please sign in to comment.