From fa7ca96c8d714dc8accea945065ca483b8026eb6 Mon Sep 17 00:00:00 2001 From: AndrewChubatiuk Date: Sat, 4 May 2024 23:57:11 +0300 Subject: [PATCH] vlinsert: added opentelemetry logs support --- app/vlinsert/main.go | 4 + app/vlinsert/opentelemetry/opentelemetry.go | 80 ++++++ app/vmagent/opentelemetry/request_handler.go | 9 +- app/vminsert/opentelemetry/request_handler.go | 9 +- docs/CHANGELOG.md | 1 + .../data-ingestion/OpenTelemetry.md | 37 +++ .../opentelemetry/firehose/parser_test.go | 2 +- lib/protoparser/opentelemetry/pb/common.go | 269 ++++++++++++++++++ lib/protoparser/opentelemetry/pb/logs.go | 220 ++++++++++++++ .../opentelemetry/pb/{pb.go => metrics.go} | 261 ----------------- lib/protoparser/opentelemetry/stream/logs.go | 147 ++++++++++ .../stream/{streamparser.go => metrics.go} | 55 ++-- .../{streamparser_test.go => metrics_test.go} | 11 +- ..._timing_test.go => metrics_timing_test.go} | 5 +- 14 files changed, 807 insertions(+), 303 deletions(-) create mode 100644 app/vlinsert/opentelemetry/opentelemetry.go create mode 100644 docs/VictoriaLogs/data-ingestion/OpenTelemetry.md create mode 100644 lib/protoparser/opentelemetry/pb/common.go create mode 100644 lib/protoparser/opentelemetry/pb/logs.go rename lib/protoparser/opentelemetry/pb/{pb.go => metrics.go} (74%) create mode 100644 lib/protoparser/opentelemetry/stream/logs.go rename lib/protoparser/opentelemetry/stream/{streamparser.go => metrics.go} (80%) rename lib/protoparser/opentelemetry/stream/{streamparser_test.go => metrics_test.go} (95%) rename lib/protoparser/opentelemetry/stream/{streamparser_timing_test.go => metrics_timing_test.go} (77%) diff --git a/app/vlinsert/main.go b/app/vlinsert/main.go index a03c0715ceb2..d3859b2a6e5e 100644 --- a/app/vlinsert/main.go +++ b/app/vlinsert/main.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/opentelemetry" ) // Init initializes vlinsert @@ -37,6 +38,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { case strings.HasPrefix(path, "/loki/"): path = strings.TrimPrefix(path, "/loki") return loki.RequestHandler(path, w, r) + case strings.HasPrefix(path, "/opentelemetry/"): + path = strings.TrimPrefix(path, "/opentelemetry") + return opentelemetry.RequestHandler(path, w, r) default: return false } diff --git a/app/vlinsert/opentelemetry/opentelemetry.go b/app/vlinsert/opentelemetry/opentelemetry.go new file mode 100644 index 000000000000..c87fc5621c6b --- /dev/null +++ b/app/vlinsert/opentelemetry/opentelemetry.go @@ -0,0 +1,80 @@ +package opentelemetry + +import ( + "fmt" + "net/http" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + opentelemetry "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream" + "github.com/VictoriaMetrics/metrics" +) + +// RequestHandler processes Opentelemetry insert requests +func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { + switch path { + case "/api/v1/push": + return handleInsert(r, w) + default: + return false + } +} + +func handleInsert(r *http.Request, w http.ResponseWriter) bool { + startTime := time.Now() + var m *otelMetrics + contentType := r.Header.Get("Content-Type") + if contentType == "application/json" { + m = jsonMetrics + } else { + m = protobufMetrics + } + isGzipped := r.Header.Get("Content-Encoding") == "gzip" + m.requestsTotal.Inc() + cp, err := insertutils.GetCommonParams(r) + if err != nil { + httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) + return true + } + n, err := opentelemetry.ParseLogsStream(r.Body, contentType, isGzipped, func(streamFields []string) (func(int64, []logstorage.Field), func()) { + lr := logstorage.GetLogRows(streamFields, nil) + processLogFn := cp.GetProcessLogMessageFunc(lr) + pushFn := func() { + vlstorage.MustAddRows(lr) + logstorage.PutLogRows(lr) + } + return processLogFn, pushFn + }) + if err != nil { + httpserver.Errorf(w, r, "cannot parse Opentelemetry request: %s", err) + return true + } + + m.ingestedTotal.Add(n) + + m.requestsDuration.UpdateDuration(startTime) + + return true +} + +type otelMetrics struct { + requestsTotal *metrics.Counter + ingestedTotal *metrics.Counter + requestsDuration *metrics.Histogram +} + +func newMetrics(format string) *otelMetrics { + return &otelMetrics{ + requestsTotal: metrics.NewCounter(fmt.Sprintf(`vl_http_requests_total{path="/insert/opentelemetry/api/v1/push",format="%s"}`, format)), + ingestedTotal: metrics.NewCounter(fmt.Sprintf(`vl_rows_ingested_total{type="opentelemetry",format="%s"}`, format)), + requestsDuration: metrics.NewHistogram(fmt.Sprintf(`vl_http_request_duration_seconds{path="/insert/opentelemetry/api/v1/push",format="%s"}`, format)), + } +} + +var ( + jsonMetrics = newMetrics("json") + protobufMetrics = newMetrics("protobuf") +) diff --git a/app/vmagent/opentelemetry/request_handler.go b/app/vmagent/opentelemetry/request_handler.go index 07b9e1f9e1d6..41cd113450bb 100644 --- a/app/vmagent/opentelemetry/request_handler.go +++ b/app/vmagent/opentelemetry/request_handler.go @@ -1,7 +1,6 @@ package opentelemetry import ( - "fmt" "net/http" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" @@ -29,14 +28,14 @@ func InsertHandler(at *auth.Token, req *http.Request) error { } isGzipped := req.Header.Get("Content-Encoding") == "gzip" var processBody func([]byte) ([]byte, error) - if req.Header.Get("Content-Type") == "application/json" { + contentType := req.Header.Get("Content-Type") + if contentType == "application/json" { if req.Header.Get("X-Amz-Firehose-Protocol-Version") != "" { processBody = firehose.ProcessRequestBody - } else { - return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") + contentType = "application/x-protobuf" } } - return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { + return stream.ParseMetricsStream(req.Body, contentType, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { return insertRows(at, tss, extraLabels) }) } diff --git a/app/vminsert/opentelemetry/request_handler.go b/app/vminsert/opentelemetry/request_handler.go index 72560108e1b0..65399475978e 100644 --- a/app/vminsert/opentelemetry/request_handler.go +++ b/app/vminsert/opentelemetry/request_handler.go @@ -1,7 +1,6 @@ package opentelemetry import ( - "fmt" "net/http" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" @@ -26,14 +25,14 @@ func InsertHandler(req *http.Request) error { } isGzipped := req.Header.Get("Content-Encoding") == "gzip" var processBody func([]byte) ([]byte, error) - if req.Header.Get("Content-Type") == "application/json" { + contentType := req.Header.Get("Content-Type") + if contentType == "application/json" { if req.Header.Get("X-Amz-Firehose-Protocol-Version") != "" { processBody = firehose.ProcessRequestBody - } else { - return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") + contentType = "application/x-protobuf" } } - return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { + return stream.ParseMetricsStream(req.Body, contentType, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { return insertRows(tss, extraLabels) }) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 365af547ac4e..32111b2f4788 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -30,6 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). ## tip +* FEATURE: [vlinsert](https://docs.victoriametrics.com/victorialogs/): added OpenTelemetry logs ingestion support. * FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): support selecting of multiple instances on the dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5869) for details. * FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): properly display version in the Stats row for the custom builds of VictoriaMetrics. * FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): add `Network Usage` panel to `Resource Usage` row. diff --git a/docs/VictoriaLogs/data-ingestion/OpenTelemetry.md b/docs/VictoriaLogs/data-ingestion/OpenTelemetry.md new file mode 100644 index 000000000000..e290ec98d63f --- /dev/null +++ b/docs/VictoriaLogs/data-ingestion/OpenTelemetry.md @@ -0,0 +1,37 @@ +--- +weight: 4 +title: OpenTelemetry setup +disableToc: true +menu: + docs: + parent: "victorialogs-data-ingestion" + weight: 4 +aliases: + - /VictoriaLogs/data-ingestion/OpenTelemetry.html +--- +# OpenTelemetry setup + +Specify logs endpoint for [OTLP/HTTP exporter](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/otlphttpexporter/README.md) in configuration file +for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/): + +```yaml +exporters: + otlphttp: + logs_endpoint: http://localhost:9428/insert/opentelemetry/api/v1/push +``` + +Substitute `localhost:9428` address inside `exporters.oltphttp.logs_endpoint` with the real TCP address of VictoriaLogs. + +VictoriaLogs divides all the ingested logs into a log streams [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) relying on resource attributes. In example below resource attributes are set for [filelog OpenTelemetry receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/filelogreceiver): + +```yaml +receivers: + filelog: + include: [/var/log/myservice/*.json] + resource: + region: us-east-1 +``` + +The ingested log entries can be queried according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/). + +See also [data ingestion troubleshooting](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#troubleshooting) docs. diff --git a/lib/protoparser/opentelemetry/firehose/parser_test.go b/lib/protoparser/opentelemetry/firehose/parser_test.go index 070ace641810..33191e5d1211 100644 --- a/lib/protoparser/opentelemetry/firehose/parser_test.go +++ b/lib/protoparser/opentelemetry/firehose/parser_test.go @@ -224,7 +224,7 @@ func TestProcessRequestBody(t *testing.T) { {__name__="amazonaws.com/AWS/EBS/VolumeReadOps",cloud.provider="aws",cloud.account.id="677435890598",cloud.region="us-east-1",aws.exporter.arn="arn:aws:cloudwatch:us-east-1:677435890598:metric-stream/custom_ebs_metric",quantile="1"} 0 1709217300000 ` var callbackCalls atomic.Uint64 - err := stream.ParseStream(bytes.NewReader(data), false, ProcessRequestBody, func(tss []prompbmarshal.TimeSeries) error { + err := stream.ParseMetricsStream(bytes.NewReader(data), "application/x-protobuf", false, ProcessRequestBody, func(tss []prompbmarshal.TimeSeries) error { callbackCalls.Add(1) s := formatTimeseries(tss) if s != sExpected { diff --git a/lib/protoparser/opentelemetry/pb/common.go b/lib/protoparser/opentelemetry/pb/common.go new file mode 100644 index 000000000000..4b1270c67c5e --- /dev/null +++ b/lib/protoparser/opentelemetry/pb/common.go @@ -0,0 +1,269 @@ +package pb + +import ( + "bytes" + "fmt" + "strings" + + "github.com/VictoriaMetrics/easyproto" +) + +// Resource represents the corresponding OTEL protobuf message +type Resource struct { + Attributes []*KeyValue +} + +func (r *Resource) marshalProtobuf(mm *easyproto.MessageMarshaler) { + for _, a := range r.Attributes { + a.marshalProtobuf(mm.AppendMessage(1)) + } +} + +func (r *Resource) unmarshalProtobuf(src []byte) (err error) { + // message Resource { + // repeated KeyValue attributes = 1; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in Resource: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Attribute data") + } + r.Attributes = append(r.Attributes, &KeyValue{}) + a := r.Attributes[len(r.Attributes)-1] + if err := a.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Attribute: %w", err) + } + } + } + return nil +} + +// KeyValue represents the corresponding OTEL protobuf message +type KeyValue struct { + Key string + Value *AnyValue +} + +func (kv *KeyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) { + mm.AppendString(1, kv.Key) + if kv.Value != nil { + kv.Value.marshalProtobuf(mm.AppendMessage(2)) + } +} + +func (kv *KeyValue) unmarshalProtobuf(src []byte) (err error) { + // message KeyValue { + // string key = 1; + // AnyValue value = 2; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in KeyValue: %w", err) + } + switch fc.FieldNum { + case 1: + key, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read Key") + } + kv.Key = strings.Clone(key) + case 2: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Value") + } + kv.Value = &AnyValue{} + if err := kv.Value.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Value: %w", err) + } + } + } + return nil +} + +// AnyValue represents the corresponding OTEL protobuf message +type AnyValue struct { + StringValue *string + BoolValue *bool + IntValue *int64 + DoubleValue *float64 + ArrayValue *ArrayValue + KeyValueList *KeyValueList + BytesValue *[]byte +} + +func (av *AnyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) { + switch { + case av.StringValue != nil: + mm.AppendString(1, *av.StringValue) + case av.BoolValue != nil: + mm.AppendBool(2, *av.BoolValue) + case av.IntValue != nil: + mm.AppendInt64(3, *av.IntValue) + case av.DoubleValue != nil: + mm.AppendDouble(4, *av.DoubleValue) + case av.ArrayValue != nil: + av.ArrayValue.marshalProtobuf(mm.AppendMessage(5)) + case av.KeyValueList != nil: + av.KeyValueList.marshalProtobuf(mm.AppendMessage(6)) + case av.BytesValue != nil: + mm.AppendBytes(7, *av.BytesValue) + } +} + +func (av *AnyValue) unmarshalProtobuf(src []byte) (err error) { + // message AnyValue { + // oneof value { + // string string_value = 1; + // bool bool_value = 2; + // int64 int_value = 3; + // double double_value = 4; + // ArrayValue array_value = 5; + // KeyValueList kvlist_value = 6; + // bytes bytes_value = 7; + // } + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in AnyValue") + } + switch fc.FieldNum { + case 1: + stringValue, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read StringValue") + } + stringValue = strings.Clone(stringValue) + av.StringValue = &stringValue + case 2: + boolValue, ok := fc.Bool() + if !ok { + return fmt.Errorf("cannot read BoolValue") + } + av.BoolValue = &boolValue + case 3: + intValue, ok := fc.Int64() + if !ok { + return fmt.Errorf("cannot read IntValue") + } + av.IntValue = &intValue + case 4: + doubleValue, ok := fc.Double() + if !ok { + return fmt.Errorf("cannot read DoubleValue") + } + av.DoubleValue = &doubleValue + case 5: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read ArrayValue") + } + av.ArrayValue = &ArrayValue{} + if err := av.ArrayValue.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal ArrayValue: %w", err) + } + case 6: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read KeyValueList") + } + av.KeyValueList = &KeyValueList{} + if err := av.KeyValueList.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal KeyValueList: %w", err) + } + case 7: + bytesValue, ok := fc.Bytes() + if !ok { + return fmt.Errorf("cannot read BytesValue") + } + bytesValue = bytes.Clone(bytesValue) + av.BytesValue = &bytesValue + } + } + return nil +} + +// ArrayValue represents the corresponding OTEL protobuf message +type ArrayValue struct { + Values []*AnyValue +} + +func (av *ArrayValue) marshalProtobuf(mm *easyproto.MessageMarshaler) { + for _, v := range av.Values { + v.marshalProtobuf(mm.AppendMessage(1)) + } +} + +func (av *ArrayValue) unmarshalProtobuf(src []byte) (err error) { + // message ArrayValue { + // repeated AnyValue values = 1; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in ArrayValue") + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Value data") + } + av.Values = append(av.Values, &AnyValue{}) + v := av.Values[len(av.Values)-1] + if err := v.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Value: %w", err) + } + } + } + return nil +} + +// KeyValueList represents the corresponding OTEL protobuf message +type KeyValueList struct { + Values []*KeyValue +} + +func (kvl *KeyValueList) marshalProtobuf(mm *easyproto.MessageMarshaler) { + for _, v := range kvl.Values { + v.marshalProtobuf(mm.AppendMessage(1)) + } +} + +func (kvl *KeyValueList) unmarshalProtobuf(src []byte) (err error) { + // message KeyValueList { + // repeated KeyValue values = 1; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in KeyValueList") + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Value data") + } + kvl.Values = append(kvl.Values, &KeyValue{}) + v := kvl.Values[len(kvl.Values)-1] + if err := v.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Value: %w", err) + } + } + } + return nil +} diff --git a/lib/protoparser/opentelemetry/pb/logs.go b/lib/protoparser/opentelemetry/pb/logs.go new file mode 100644 index 000000000000..db1b91a808a8 --- /dev/null +++ b/lib/protoparser/opentelemetry/pb/logs.go @@ -0,0 +1,220 @@ +package pb + +import ( + "fmt" + "strings" + + "github.com/VictoriaMetrics/easyproto" +) + +var ( + logSeverities = []string{ + "unspecified", + "trace", + "trace2", + "trace3", + "trace4", + "debug", + "debug2", + "debug3", + "debug4", + "info", + "info2", + "info3", + "info4", + "error", + "error2", + "error3", + "error4", + "fatal", + "fatal2", + "fatal3", + "fatal4", + } +) + +// ExportLogsServiceRequest represents the corresponding OTEL protobuf message +type ExportLogsServiceRequest struct { + ResourceLogs []*ResourceLogs +} + +// UnmarshalProtobuf unmarshals r from protobuf message at src. +func (r *ExportLogsServiceRequest) UnmarshalProtobuf(src []byte) error { + r.ResourceLogs = nil + return r.unmarshalProtobuf(src) +} + +func (r *ExportLogsServiceRequest) unmarshalProtobuf(src []byte) (err error) { + // message ExportLogsServiceRequest { + // repeated ResourceLogs resource_metrics = 1; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in ExportLogsServiceRequest: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read ResourceLogs data") + } + r.ResourceLogs = append(r.ResourceLogs, &ResourceLogs{}) + rm := r.ResourceLogs[len(r.ResourceLogs)-1] + if err := rm.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal ResourceLogs: %w", err) + } + } + } + return nil +} + +// ResourceLogs represents the corresponding OTEL protobuf message +type ResourceLogs struct { + Resource *Resource + ScopeLogs []*ScopeLogs +} + +func (rl *ResourceLogs) unmarshalProtobuf(src []byte) (err error) { + // message ResourceLogs { + // Resource resource = 1; + // repeated ScopeLogs scope_logs = 2; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in ResourceLogs: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Resource data") + } + rl.Resource = &Resource{} + if err := rl.Resource.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot umarshal Resource: %w", err) + } + case 2: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read ScopeLogs data") + } + rl.ScopeLogs = append(rl.ScopeLogs, &ScopeLogs{}) + sl := rl.ScopeLogs[len(rl.ScopeLogs)-1] + if err := sl.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal ScopeLogs: %w", err) + } + } + } + return nil +} + +// ScopeLogs represents the corresponding OTEL protobuf message +type ScopeLogs struct { + LogRecords []*LogRecord +} + +func (sl *ScopeLogs) unmarshalProtobuf(src []byte) (err error) { + // message ScopeLogs { + // repeated LogRecord log_records = 2; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in ScopeLogs: %w", err) + } + switch fc.FieldNum { + case 2: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read LogRecord data") + } + sl.LogRecords = append(sl.LogRecords, &LogRecord{}) + l := sl.LogRecords[len(sl.LogRecords)-1] + if err := l.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal LogRecord: %w", err) + } + } + } + return nil +} + +// LogRecord represents the corresponding OTEL protobuf message +type LogRecord struct { + Timestamp uint64 + Severity string + Body *AnyValue + Attributes []*KeyValue +} + +func (r *LogRecord) unmarshalProtobuf(src []byte) (err error) { + // message LogRecord { + // fixed64 time_unix_nano = 1; + // fixed64 observed_time_unix_nano = 11; + // SeverityNumber severity_number = 2; + // string severity_text = 3; + // AnyValue body = 5; + // repeated KeyValue attributes = 6; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in LogRecord: %w", err) + } + switch fc.FieldNum { + case 1: + ts, ok := fc.Fixed64() + if !ok { + return fmt.Errorf("cannot read log record timestamp") + } + r.Timestamp = ts + case 11: + if r.Timestamp == 0 { + ts, ok := fc.Fixed64() + if !ok { + return fmt.Errorf("cannot read log record observed timestamp") + } + r.Timestamp = ts + } + case 2: + severity, ok := fc.Int32() + if !ok { + return fmt.Errorf("cannot read severity number") + } + r.Severity = logSeverities[severity] + case 3: + if r.Severity == "" { + severity, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read severity string") + } + r.Severity = strings.Clone(severity) + } + case 5: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Body") + } + r.Body = &AnyValue{} + if err := r.Body.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Body: %w", err) + } + case 6: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read attributes data") + } + r.Attributes = append(r.Attributes, &KeyValue{}) + a := r.Attributes[len(r.Attributes)-1] + if err := a.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Attribute: %w", err) + } + } + } + return nil +} diff --git a/lib/protoparser/opentelemetry/pb/pb.go b/lib/protoparser/opentelemetry/pb/metrics.go similarity index 74% rename from lib/protoparser/opentelemetry/pb/pb.go rename to lib/protoparser/opentelemetry/pb/metrics.go index 883950e9f11d..c6f525abba38 100644 --- a/lib/protoparser/opentelemetry/pb/pb.go +++ b/lib/protoparser/opentelemetry/pb/metrics.go @@ -1,7 +1,6 @@ package pb import ( - "bytes" "fmt" "strings" @@ -113,43 +112,6 @@ func (rm *ResourceMetrics) unmarshalProtobuf(src []byte) (err error) { return nil } -// Resource represents the corresponding OTEL protobuf message -type Resource struct { - Attributes []*KeyValue -} - -func (r *Resource) marshalProtobuf(mm *easyproto.MessageMarshaler) { - for _, a := range r.Attributes { - a.marshalProtobuf(mm.AppendMessage(1)) - } -} - -func (r *Resource) unmarshalProtobuf(src []byte) (err error) { - // message Resource { - // repeated KeyValue attributes = 1; - // } - var fc easyproto.FieldContext - for len(src) > 0 { - src, err = fc.NextField(src) - if err != nil { - return fmt.Errorf("cannot read next field in Resource: %w", err) - } - switch fc.FieldNum { - case 1: - data, ok := fc.MessageData() - if !ok { - return fmt.Errorf("cannot read Attribute data") - } - r.Attributes = append(r.Attributes, &KeyValue{}) - a := r.Attributes[len(r.Attributes)-1] - if err := a.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal Attribute: %w", err) - } - } - } - return nil -} - // ScopeMetrics represents the corresponding OTEL protobuf message type ScopeMetrics struct { Metrics []*Metric @@ -283,229 +245,6 @@ func (m *Metric) unmarshalProtobuf(src []byte) (err error) { return nil } -// KeyValue represents the corresponding OTEL protobuf message -type KeyValue struct { - Key string - Value *AnyValue -} - -func (kv *KeyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) { - mm.AppendString(1, kv.Key) - if kv.Value != nil { - kv.Value.marshalProtobuf(mm.AppendMessage(2)) - } -} - -func (kv *KeyValue) unmarshalProtobuf(src []byte) (err error) { - // message KeyValue { - // string key = 1; - // AnyValue value = 2; - // } - var fc easyproto.FieldContext - for len(src) > 0 { - src, err = fc.NextField(src) - if err != nil { - return fmt.Errorf("cannot read next field in KeyValue: %w", err) - } - switch fc.FieldNum { - case 1: - key, ok := fc.String() - if !ok { - return fmt.Errorf("cannot read Key") - } - kv.Key = strings.Clone(key) - case 2: - data, ok := fc.MessageData() - if !ok { - return fmt.Errorf("cannot read Value") - } - kv.Value = &AnyValue{} - if err := kv.Value.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal Value: %w", err) - } - } - } - return nil -} - -// AnyValue represents the corresponding OTEL protobuf message -type AnyValue struct { - StringValue *string - BoolValue *bool - IntValue *int64 - DoubleValue *float64 - ArrayValue *ArrayValue - KeyValueList *KeyValueList - BytesValue *[]byte -} - -func (av *AnyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) { - switch { - case av.StringValue != nil: - mm.AppendString(1, *av.StringValue) - case av.BoolValue != nil: - mm.AppendBool(2, *av.BoolValue) - case av.IntValue != nil: - mm.AppendInt64(3, *av.IntValue) - case av.DoubleValue != nil: - mm.AppendDouble(4, *av.DoubleValue) - case av.ArrayValue != nil: - av.ArrayValue.marshalProtobuf(mm.AppendMessage(5)) - case av.KeyValueList != nil: - av.KeyValueList.marshalProtobuf(mm.AppendMessage(6)) - case av.BytesValue != nil: - mm.AppendBytes(7, *av.BytesValue) - } -} - -func (av *AnyValue) unmarshalProtobuf(src []byte) (err error) { - // message AnyValue { - // oneof value { - // string string_value = 1; - // bool bool_value = 2; - // int64 int_value = 3; - // double double_value = 4; - // ArrayValue array_value = 5; - // KeyValueList kvlist_value = 6; - // bytes bytes_value = 7; - // } - // } - var fc easyproto.FieldContext - for len(src) > 0 { - src, err = fc.NextField(src) - if err != nil { - return fmt.Errorf("cannot read next field in AnyValue") - } - switch fc.FieldNum { - case 1: - stringValue, ok := fc.String() - if !ok { - return fmt.Errorf("cannot read StringValue") - } - stringValue = strings.Clone(stringValue) - av.StringValue = &stringValue - case 2: - boolValue, ok := fc.Bool() - if !ok { - return fmt.Errorf("cannot read BoolValue") - } - av.BoolValue = &boolValue - case 3: - intValue, ok := fc.Int64() - if !ok { - return fmt.Errorf("cannot read IntValue") - } - av.IntValue = &intValue - case 4: - doubleValue, ok := fc.Double() - if !ok { - return fmt.Errorf("cannot read DoubleValue") - } - av.DoubleValue = &doubleValue - case 5: - data, ok := fc.MessageData() - if !ok { - return fmt.Errorf("cannot read ArrayValue") - } - av.ArrayValue = &ArrayValue{} - if err := av.ArrayValue.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal ArrayValue: %w", err) - } - case 6: - data, ok := fc.MessageData() - if !ok { - return fmt.Errorf("cannot read KeyValueList") - } - av.KeyValueList = &KeyValueList{} - if err := av.KeyValueList.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal KeyValueList: %w", err) - } - case 7: - bytesValue, ok := fc.Bytes() - if !ok { - return fmt.Errorf("cannot read BytesValue") - } - bytesValue = bytes.Clone(bytesValue) - av.BytesValue = &bytesValue - } - } - return nil -} - -// ArrayValue represents the corresponding OTEL protobuf message -type ArrayValue struct { - Values []*AnyValue -} - -func (av *ArrayValue) marshalProtobuf(mm *easyproto.MessageMarshaler) { - for _, v := range av.Values { - v.marshalProtobuf(mm.AppendMessage(1)) - } -} - -func (av *ArrayValue) unmarshalProtobuf(src []byte) (err error) { - // message ArrayValue { - // repeated AnyValue values = 1; - // } - var fc easyproto.FieldContext - for len(src) > 0 { - src, err = fc.NextField(src) - if err != nil { - return fmt.Errorf("cannot read next field in ArrayValue") - } - switch fc.FieldNum { - case 1: - data, ok := fc.MessageData() - if !ok { - return fmt.Errorf("cannot read Value data") - } - av.Values = append(av.Values, &AnyValue{}) - v := av.Values[len(av.Values)-1] - if err := v.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal Value: %w", err) - } - } - } - return nil -} - -// KeyValueList represents the corresponding OTEL protobuf message -type KeyValueList struct { - Values []*KeyValue -} - -func (kvl *KeyValueList) marshalProtobuf(mm *easyproto.MessageMarshaler) { - for _, v := range kvl.Values { - v.marshalProtobuf(mm.AppendMessage(1)) - } -} - -func (kvl *KeyValueList) unmarshalProtobuf(src []byte) (err error) { - // message KeyValueList { - // repeated KeyValue values = 1; - // } - var fc easyproto.FieldContext - for len(src) > 0 { - src, err = fc.NextField(src) - if err != nil { - return fmt.Errorf("cannot read next field in KeyValueList") - } - switch fc.FieldNum { - case 1: - data, ok := fc.MessageData() - if !ok { - return fmt.Errorf("cannot read Value data") - } - kvl.Values = append(kvl.Values, &KeyValue{}) - v := kvl.Values[len(kvl.Values)-1] - if err := v.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal Value: %w", err) - } - } - } - return nil -} - // Gauge represents the corresponding OTEL protobuf message type Gauge struct { DataPoints []*NumberDataPoint diff --git a/lib/protoparser/opentelemetry/stream/logs.go b/lib/protoparser/opentelemetry/stream/logs.go new file mode 100644 index 000000000000..8b9b5ad8fd92 --- /dev/null +++ b/lib/protoparser/opentelemetry/stream/logs.go @@ -0,0 +1,147 @@ +package stream + +import ( + "encoding/json" + "fmt" + "io" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" +) + +// GetStreamFn is a function type, which returns functions for logs processing and submission for a given list of stream fields +type GetStreamFn func([]string) (func(int64, []logstorage.Field), func()) + +// ParseLogsStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows. +func ParseLogsStream(r io.Reader, contentType string, isGzipped bool, getStream GetStreamFn) (int, error) { + wcr := writeconcurrencylimiter.GetReader(r) + defer writeconcurrencylimiter.PutReader(wcr) + r = wcr + + if isGzipped { + zr, err := common.GetGzipReader(r) + if err != nil { + return 0, fmt.Errorf("cannot read gzip-compressed OpenTelemetry protocol data: %w", err) + } + defer common.PutGzipReader(zr) + r = zr + } + + wr := getWriteLogsContext() + defer putWriteLogsContext(wr) + req, err := wr.readAndUnpackLogsRequest(r, contentType) + if err != nil { + return 0, fmt.Errorf("cannot unpack OpenTelemetry logs: %w", err) + } + return wr.parseLogsRequest(req, getStream), nil +} + +func (wr *writeLogsContext) readAndUnpackLogsRequest(r io.Reader, contentType string) (*pb.ExportLogsServiceRequest, error) { + if _, err := wr.bb.ReadFrom(r); err != nil { + return nil, fmt.Errorf("cannot read request: %w", err) + } + var req pb.ExportLogsServiceRequest + if contentType == "application/json" { + if err := json.Unmarshal(wr.bb.B, &req); err != nil { + return nil, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(wr.bb.B), err) + } + } else { + if err := req.UnmarshalProtobuf(wr.bb.B); err != nil { + return nil, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(wr.bb.B), err) + } + } + return &req, nil +} + +func (wr *writeLogsContext) parseLogsRequest(req *pb.ExportLogsServiceRequest, getStream GetStreamFn) int { + var count int + for _, rl := range req.ResourceLogs { + var attributes []*pb.KeyValue + if rl.Resource != nil { + attributes = rl.Resource.Attributes + } + var streamFields []string + wr.baseFields, streamFields = appendAttributesToFields(wr.baseFields[:0], attributes, true) + processFn, pushFn := getStream(streamFields) + for _, sc := range rl.ScopeLogs { + count += wr.pushFieldsFromScopeLogs(sc, processFn) + } + pushFn() + } + return count +} + +// appendAttributesToFields appends attributes to dst and returns the result. +func appendAttributesToFields(dst []logstorage.Field, attributes []*pb.KeyValue, useForStream bool) ([]logstorage.Field, []string) { + var streamFields []string + if useForStream { + streamFields = make([]string, len(attributes)) + } + for i, at := range attributes { + dst = append(dst, logstorage.Field{ + Name: at.Key, + Value: at.Value.FormatString(), + }) + if useForStream { + streamFields[i] = at.Key + } + } + return dst, streamFields +} + +func (wr *writeLogsContext) pushFieldsFromScopeLogs(sc *pb.ScopeLogs, processFn func(int64, []logstorage.Field)) int { + for _, lr := range sc.LogRecords { + wr.fields, _ = appendAttributesToFields(wr.fields, lr.Attributes, false) + if lr.Severity != "" { + wr.fields = append(wr.fields, logstorage.Field{ + Name: "severity", + Value: lr.Severity, + }) + } + wr.fields = append(wr.fields, logstorage.Field{ + Name: "_msg", + Value: lr.Body.FormatString(), + }) + processFn(int64(lr.Timestamp), append(wr.fields, wr.baseFields...)) + } + return len(sc.LogRecords) +} + +func (wr *writeLogsContext) reset() { + wr.bb.Reset() + + clear(wr.fields) + wr.fields = wr.fields[:0] + clear(wr.baseFields) + wr.baseFields = wr.baseFields[:0] +} + +type writeLogsContext struct { + // bb holds the original data (json or protobuf), which must be parsed. + bb bytesutil.ByteBuffer + + // fields holds parsed logs fields + fields []logstorage.Field + + // baseFields are fields, which must be added to all the ingested samples + baseFields []logstorage.Field +} + +var wrLogsPool sync.Pool + +func getWriteLogsContext() *writeLogsContext { + v := wrLogsPool.Get() + if v == nil { + return &writeLogsContext{} + } + return v.(*writeLogsContext) +} + +func putWriteLogsContext(wr *writeLogsContext) { + wr.reset() + wrLogsPool.Put(wr) +} diff --git a/lib/protoparser/opentelemetry/stream/streamparser.go b/lib/protoparser/opentelemetry/stream/metrics.go similarity index 80% rename from lib/protoparser/opentelemetry/stream/streamparser.go rename to lib/protoparser/opentelemetry/stream/metrics.go index f2e60a820392..7a779238b784 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser.go +++ b/lib/protoparser/opentelemetry/stream/metrics.go @@ -1,6 +1,7 @@ package stream import ( + "encoding/json" "fmt" "io" "strconv" @@ -18,12 +19,12 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" ) -// ParseStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows. +// ParseMetricsStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows. // // callback shouldn't hold tss items after returning. // // optional processBody can be used for pre-processing the read request body from r before parsing it in OpenTelemetry format. -func ParseStream(r io.Reader, isGzipped bool, processBody func([]byte) ([]byte, error), callback func(tss []prompbmarshal.TimeSeries) error) error { +func ParseMetricsStream(r io.Reader, contentType string, isGzipped bool, processBody func([]byte) ([]byte, error), callback func(tss []prompbmarshal.TimeSeries) error) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -37,9 +38,9 @@ func ParseStream(r io.Reader, isGzipped bool, processBody func([]byte) ([]byte, r = zr } - wr := getWriteContext() - defer putWriteContext(wr) - req, err := wr.readAndUnpackRequest(r, processBody) + wr := getWriteMetricsContext() + defer putWriteMetricsContext(wr) + req, err := wr.readAndUnpackMetricsRequest(r, contentType, processBody) if err != nil { return fmt.Errorf("cannot unpack OpenTelemetry metrics: %w", err) } @@ -52,7 +53,7 @@ func ParseStream(r io.Reader, isGzipped bool, processBody func([]byte) ([]byte, return nil } -func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) { +func (wr *writeMetricsContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) { for _, m := range sc.Metrics { if len(m.Name) == 0 { // skip metrics without names @@ -92,7 +93,7 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) { } // appendSampleFromNumericPoint appends p to wr.tss -func (wr *writeContext) appendSampleFromNumericPoint(metricName string, p *pb.NumberDataPoint) { +func (wr *writeMetricsContext) appendSampleFromNumericPoint(metricName string, p *pb.NumberDataPoint) { var v float64 switch { case p.IntValue != nil: @@ -109,7 +110,7 @@ func (wr *writeContext) appendSampleFromNumericPoint(metricName string, p *pb.Nu } // appendSamplesFromSummary appends summary p to wr.tss -func (wr *writeContext) appendSamplesFromSummary(metricName string, p *pb.SummaryDataPoint) { +func (wr *writeMetricsContext) appendSamplesFromSummary(metricName string, p *pb.SummaryDataPoint) { t := int64(p.TimeUnixNano / 1e6) isStale := (p.Flags)&uint32(1) != 0 wr.pointLabels = appendAttributesToPromLabels(wr.pointLabels[:0], p.Attributes) @@ -123,7 +124,7 @@ func (wr *writeContext) appendSamplesFromSummary(metricName string, p *pb.Summar } // appendSamplesFromHistogram appends histogram p to wr.tss -func (wr *writeContext) appendSamplesFromHistogram(metricName string, p *pb.HistogramDataPoint) { +func (wr *writeMetricsContext) appendSamplesFromHistogram(metricName string, p *pb.HistogramDataPoint) { if len(p.BucketCounts) == 0 { // nothing to append return @@ -159,12 +160,12 @@ func (wr *writeContext) appendSamplesFromHistogram(metricName string, p *pb.Hist } // appendSample appends sample with the given metricName to wr.tss -func (wr *writeContext) appendSample(metricName string, t int64, v float64, isStale bool) { +func (wr *writeMetricsContext) appendSample(metricName string, t int64, v float64, isStale bool) { wr.appendSampleWithExtraLabel(metricName, "", "", t, v, isStale) } // appendSampleWithExtraLabel appends sample with the given metricName and the given (labelName=labelValue) extra label to wr.tss -func (wr *writeContext) appendSampleWithExtraLabel(metricName, labelName, labelValue string, t int64, v float64, isStale bool) { +func (wr *writeMetricsContext) appendSampleWithExtraLabel(metricName, labelName, labelValue string, t int64, v float64, isStale bool) { if isStale { v = decimal.StaleNaN } @@ -217,7 +218,7 @@ func appendAttributesToPromLabels(dst []prompbmarshal.Label, attributes []*pb.Ke return dst } -type writeContext struct { +type writeMetricsContext struct { // bb holds the original data (json or protobuf), which must be parsed. bb bytesutil.ByteBuffer @@ -235,7 +236,7 @@ type writeContext struct { samplesPool []prompbmarshal.Sample } -func (wr *writeContext) reset() { +func (wr *writeMetricsContext) reset() { wr.bb.Reset() clear(wr.tss) @@ -253,7 +254,7 @@ func resetLabels(labels []prompbmarshal.Label) []prompbmarshal.Label { return labels[:0] } -func (wr *writeContext) readAndUnpackRequest(r io.Reader, processBody func([]byte) ([]byte, error)) (*pb.ExportMetricsServiceRequest, error) { +func (wr *writeMetricsContext) readAndUnpackMetricsRequest(r io.Reader, contentType string, processBody func([]byte) ([]byte, error)) (*pb.ExportMetricsServiceRequest, error) { if _, err := wr.bb.ReadFrom(r); err != nil { return nil, fmt.Errorf("cannot read request: %w", err) } @@ -265,13 +266,19 @@ func (wr *writeContext) readAndUnpackRequest(r io.Reader, processBody func([]byt } wr.bb.B = append(wr.bb.B[:0], data...) } - if err := req.UnmarshalProtobuf(wr.bb.B); err != nil { - return nil, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(wr.bb.B), err) + if contentType == "application/json" { + if err := json.Unmarshal(wr.bb.B, &req); err != nil { + return nil, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(wr.bb.B), err) + } + } else { + if err := req.UnmarshalProtobuf(wr.bb.B); err != nil { + return nil, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(wr.bb.B), err) + } } return &req, nil } -func (wr *writeContext) parseRequestToTss(req *pb.ExportMetricsServiceRequest) { +func (wr *writeMetricsContext) parseRequestToTss(req *pb.ExportMetricsServiceRequest) { for _, rm := range req.ResourceMetrics { var attributes []*pb.KeyValue if rm.Resource != nil { @@ -284,19 +291,19 @@ func (wr *writeContext) parseRequestToTss(req *pb.ExportMetricsServiceRequest) { } } -var wrPool sync.Pool +var wrMetricsPool sync.Pool -func getWriteContext() *writeContext { - v := wrPool.Get() +func getWriteMetricsContext() *writeMetricsContext { + v := wrMetricsPool.Get() if v == nil { - return &writeContext{} + return &writeMetricsContext{} } - return v.(*writeContext) + return v.(*writeMetricsContext) } -func putWriteContext(wr *writeContext) { +func putWriteMetricsContext(wr *writeMetricsContext) { wr.reset() - wrPool.Put(wr) + wrMetricsPool.Put(wr) } var ( diff --git a/lib/protoparser/opentelemetry/stream/streamparser_test.go b/lib/protoparser/opentelemetry/stream/metrics_test.go similarity index 95% rename from lib/protoparser/opentelemetry/stream/streamparser_test.go rename to lib/protoparser/opentelemetry/stream/metrics_test.go index f363e5dfa0ef..3abb649f9ac7 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser_test.go +++ b/lib/protoparser/opentelemetry/stream/metrics_test.go @@ -14,7 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" ) -func TestParseStream(t *testing.T) { +func TestParseMetricsStream(t *testing.T) { f := func(samples []*pb.Metric, tssExpected []prompbmarshal.TimeSeries, usePromNaming bool) { t.Helper() @@ -67,7 +67,8 @@ func TestParseStream(t *testing.T) { // Verify protobuf parsing pbData := req.MarshalProtobuf(nil) - if err := checkParseStream(pbData, checkSeries); err != nil { + contentType := "application/x-protobuf" + if err := checkParseMetricsStream(pbData, contentType, checkSeries); err != nil { t.Fatalf("cannot parse protobuf: %s", err) } } @@ -194,9 +195,9 @@ func TestParseStream(t *testing.T) { ) } -func checkParseStream(data []byte, checkSeries func(tss []prompbmarshal.TimeSeries) error) error { +func checkParseMetricsStream(data []byte, contentType string, checkSeries func(tss []prompbmarshal.TimeSeries) error) error { // Verify parsing without compression - if err := ParseStream(bytes.NewBuffer(data), false, nil, checkSeries); err != nil { + if err := ParseMetricsStream(bytes.NewBuffer(data), contentType, false, nil, checkSeries); err != nil { return fmt.Errorf("error when parsing data: %w", err) } @@ -209,7 +210,7 @@ func checkParseStream(data []byte, checkSeries func(tss []prompbmarshal.TimeSeri if err := zw.Close(); err != nil { return fmt.Errorf("cannot close gzip writer: %w", err) } - if err := ParseStream(&bb, true, nil, checkSeries); err != nil { + if err := ParseMetricsStream(&bb, contentType, true, nil, checkSeries); err != nil { return fmt.Errorf("error when parsing compressed data: %w", err) } diff --git a/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go b/lib/protoparser/opentelemetry/stream/metrics_timing_test.go similarity index 77% rename from lib/protoparser/opentelemetry/stream/streamparser_timing_test.go rename to lib/protoparser/opentelemetry/stream/metrics_timing_test.go index b0df4135fac8..7a4d7afffcd0 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go +++ b/lib/protoparser/opentelemetry/stream/metrics_timing_test.go @@ -8,7 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" ) -func BenchmarkParseStream(b *testing.B) { +func BenchmarkParseMetricsStream(b *testing.B) { samples := []*pb.Metric{ generateGauge("my-gauge", ""), generateHistogram("my-histogram", ""), @@ -22,9 +22,10 @@ func BenchmarkParseStream(b *testing.B) { ResourceMetrics: []*pb.ResourceMetrics{generateOTLPSamples(samples)}, } data := pbRequest.MarshalProtobuf(nil) + contentType := "application/x-protobuf" for p.Next() { - err := ParseStream(bytes.NewBuffer(data), false, nil, func(_ []prompbmarshal.TimeSeries) error { + err := ParseMetricsStream(bytes.NewBuffer(data), contentType, false, nil, func(_ []prompbmarshal.TimeSeries) error { return nil }) if err != nil {