From 1538764fa5bc1b9556289412536ad7d5bf6fb082 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 16 Oct 2023 00:25:23 +0200 Subject: [PATCH] app/{vmagent,vminsert}: follow-up for NewRelic data ingestion protocol support This is a follow-up for f60c08a7bdcc87570d1076a3f4520637d3a6ea0f Changes: - Make sure all the urls related to NewRelic protocol start from /newrelic . Previously some urls were started from /api/v1/newrelic - Remove /api/v1 part from NewRelic urls, since it has no sense - Remove automatic transformation from CamelCase to snake_case for NewRelic labels and metric names, since it may complicate the transition from NewRelic to VictoriaMetrics. Preserve all the metric names and label names, so users could query metrics and labels by the same names which are used in NewRelic. The automatic transformation from CamelCase to snake_case can be added later as a special action for relabeling rules if needed. - Properly update per-tenant data ingestion stats at app/vmagent/newrelic/request_handler.go . Previously it was always zero. - Fix NewRelic urls in vmagent when multitenant data ingestion is enabled. Previously they were mistakenly started from `/`. - Document NewRelic data ingestion url at docs/Cluster-VictoriaMetrics.md - Remove superflouos memory allocations at lib/protoparser/newrelic - Improve tests at lib/protoparser/newrelic/* Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712 --- README.md | 107 +++--- app/vmagent/main.go | 20 +- app/vmagent/newrelic/request_handler.go | 55 +-- app/vminsert/main.go | 14 +- app/vminsert/newrelic/request_handler.go | 64 ++-- docs/Cluster-VictoriaMetrics.md | 1 + docs/README.md | 107 +++--- docs/Single-server-VictoriaMetrics.md | 107 +++--- .../datadog/stream/streamparser.go | 4 +- lib/protoparser/newrelic/parser.go | 322 ++++++++--------- lib/protoparser/newrelic/parser_test.go | 329 ++++++++++-------- .../newrelic/parser_timing_test.go | 27 +- .../newrelic/stream/push_context.go | 80 ----- .../newrelic/stream/streamparser.go | 128 +++++-- .../newrelic/stream/streamparser_test.go | 106 ++++++ .../promremotewrite/stream/streamparser.go | 2 +- 16 files changed, 805 insertions(+), 668 deletions(-) delete mode 100644 lib/protoparser/newrelic/stream/push_context.go create mode 100644 lib/protoparser/newrelic/stream/streamparser_test.go diff --git a/README.md b/README.md index 8610f19ddfc1e..aee7f29822966 100644 --- a/README.md +++ b/README.md @@ -854,70 +854,75 @@ For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all ## How to send data from NewRelic agent VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) -at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. -NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) -which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). +at `/newrelic/infra/v2/metrics/events/bulk` HTTP path. +VictoriaMetrics receives [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +from NewRelic agent at the given path, transforms them to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) +according to [these docs](#newrelic-agent-data-mapping) before storing the raw samples to the database. -NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. -It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. +You need passing `COLLECTOR_URL` and `NRIA_LICENSE_KEY` environment variables to NewRelic infrastructure agent in order to send the collected metrics to VictoriaMetrics. +The `COLLECTOR_URL` must point to `/newrelic` HTTP endpoint at VictoriaMetrics, while the `NRIA_LICENSE_KEY` must contain NewRelic license key, +which can be obtained [here](https://newrelic.com/signup). +For example, if VictoriaMetrics runs at `localhost:8428`, then the following command can be used for running NewRelic infrastructure agent: -To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example: ```console -COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra +COLLECTOR_URL="http://localhost:8428/newrelic" NRIA_LICENSE_KEY="NEWRELIC_LICENSE_KEY" ./newrelic-infra ``` -### NewRelic agent data mapping +### NewRelic agent data mapping + +VictoriaMetrics maps [NewRelic Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in the following way: + +1. Every numeric field is converted into a raw sample with the corresponding name. +1. The `eventType` and all the other fields with `string` value type are attached to every raw sample as [metric labels](https://docs.victoriametrics.com/keyConcepts.html#labels). +1. The `timestamp` field is used as timestamp for the ingested [raw sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). + The `timestamp` field may be specified either in seconds or in milliseconds since the [Unix Epoch](https://en.wikipedia.org/wiki/Unix_time). + If the `timestamp` field is missing, then the raw sample is stored with the current timestamp. + +For example, let's import the following NewRelic Events request to VictoriaMetrics: -As example, lets create `newrelic.json` file with the following content: ```json [ - { - "Events":[ - { - "eventType":"SystemSample", - "entityKey":"macbook-pro.local", - "cpuPercent":25.056660790748904, - "cpuUserPercent":8.687987912389374, - "cpuSystemPercent":16.36867287835953, - "cpuIOWaitPercent":0, - "cpuIdlePercent":74.94333920925109, - "cpuStealPercent":0, - "loadAverageOneMinute":5.42333984375, - "loadAverageFiveMinute":4.099609375, - "loadAverageFifteenMinute":3.58203125 - } - ] - } - ] -``` - -Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: + { + "Events":[ + { + "eventType":"SystemSample", + "entityKey":"macbook-pro.local", + "cpuPercent":25.056660790748904, + "cpuUserPercent":8.687987912389374, + "cpuSystemPercent":16.36867287835953, + "cpuIOWaitPercent":0, + "cpuIdlePercent":74.94333920925109, + "cpuStealPercent":0, + "loadAverageOneMinute":5.42333984375, + "loadAverageFiveMinute":4.099609375, + "loadAverageFifteenMinute":3.58203125 + } + ] + } +] +``` + +Save this JSON into `newrelic.json` file and then use the following command in order to import it into VictoriaMetrics: ```console -curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk +curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/infra/v2/metrics/events/bulk ``` -If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics -in vmui via query `{__name__!=""}`: -```console -system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 -system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 -system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 -system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 -system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 -system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 -system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 -``` - -The fields in `newrelic.json` are transformed in the following way: -1. `eventType` filed is used as prefix for all metrics in the object; -2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object; -3. the rest fields with numeric values will be used as metrics; -4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted, -current time is used. +Let's fetch the ingested data via [data export API](#how-to-export-data-in-json-line-format): +```console +curl http://localhost:8428/api/v1/export -d 'match={eventType="SystemSample"}' +{"metric":{"__name__":"cpuStealPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFiveMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[4.099609375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIOWaitPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuSystemPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[16.368672878359],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageOneMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[5.42333984375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuUserPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[8.687987912389],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIdlePercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[74.9433392092],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFifteenMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[3.58203125],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[25.056660790748],"timestamps":[1697407970000]} +``` ## Prometheus querying API usage diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 5f81f881c52a5..7c40ac7c9504d 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -320,19 +320,19 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusOK) return true - case "/newrelic/api/v1": + case "/newrelic": newrelicCheckRequest.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true - case "/newrelic/api/v1/inventory/deltas": + case "/newrelic/inventory/deltas": newrelicInventoryRequests.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) return true - case "/newrelic/api/v1/infra/v2/metrics/events/bulk": + case "/newrelic/infra/v2/metrics/events/bulk": newrelicWriteRequests.Inc() if err := newrelic.InsertHandlerForHTTP(nil, r); err != nil { newrelicWriteErrors.Inc() @@ -543,19 +543,19 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri } w.WriteHeader(http.StatusOK) return true - case "/newrelic/api/v1": + case "newrelic": newrelicCheckRequest.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true - case "/newrelic/api/v1/inventory/deltas": + case "newrelic/inventory/deltas": newrelicInventoryRequests.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) return true - case "/newrelic/api/v1/infra/v2/metrics/events/bulk": + case "newrelic/infra/v2/metrics/events/bulk": newrelicWriteRequests.Inc() if err := newrelic.InsertHandlerForHTTP(at, r); err != nil { newrelicWriteErrors.Inc() @@ -638,11 +638,11 @@ var ( opentelemetryPushRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) opentelemetryPushErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) - newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) - newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) - newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/inventory/deltas", protocol="newrelic"}`) - newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1", protocol="newrelic"}`) + newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/inventory/deltas", protocol="newrelic"}`) + newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic", protocol="newrelic"}`) promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`) promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/service-discovery"}`) diff --git a/app/vmagent/newrelic/request_handler.go b/app/vmagent/newrelic/request_handler.go index 378476405595e..b164fc9733d0b 100644 --- a/app/vmagent/newrelic/request_handler.go +++ b/app/vmagent/newrelic/request_handler.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" @@ -29,42 +30,48 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { } ce := req.Header.Get("Content-Encoding") isGzip := ce == "gzip" - return stream.Parse(req.Body, isGzip, func(series []newrelic.Metric) error { - return insertRows(at, series, extraLabels) + return stream.Parse(req.Body, isGzip, func(rows []newrelic.Row) error { + return insertRows(at, rows, extraLabels) }) } -func insertRows(at *auth.Token, rows []newrelic.Metric, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, rows []newrelic.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) - rowsTotal := 0 + samplesCount := 0 tssDst := ctx.WriteRequest.Timeseries[:0] labels := ctx.Labels[:0] samples := ctx.Samples[:0] for i := range rows { r := &rows[i] - labelsLen := len(labels) - labels = append(labels, prompbmarshal.Label{ - Name: "__name__", - Value: r.Metric, - }) - for j := range r.Tags { - tag := &r.Tags[j] + tags := r.Tags + srcSamples := r.Samples + for j := range srcSamples { + s := &srcSamples[j] + labelsLen := len(labels) labels = append(labels, prompbmarshal.Label{ - Name: tag.Key, - Value: tag.Value, + Name: "__name__", + Value: bytesutil.ToUnsafeString(s.Name), }) + for k := range tags { + t := &tags[k] + labels = append(labels, prompbmarshal.Label{ + Name: bytesutil.ToUnsafeString(t.Key), + Value: bytesutil.ToUnsafeString(t.Value), + }) + } + samples = append(samples, prompbmarshal.Sample{ + Value: s.Value, + Timestamp: r.Timestamp, + }) + tssDst = append(tssDst, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: samples[len(samples)-1:], + }) + labels = append(labels, extraLabels...) } - samples = append(samples, prompbmarshal.Sample{ - Value: r.Value, - Timestamp: r.Timestamp, - }) - tssDst = append(tssDst, prompbmarshal.TimeSeries{ - Labels: labels[labelsLen:], - Samples: samples[len(samples)-1:], - }) - labels = append(labels, extraLabels...) + samplesCount += len(srcSamples) } ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels @@ -72,8 +79,8 @@ func insertRows(at *auth.Token, rows []newrelic.Metric, extraLabels []prompbmars remotewrite.Push(at, &ctx.WriteRequest) rowsInserted.Add(len(rows)) if at != nil { - rowsTenantInserted.Get(at).Add(rowsTotal) + rowsTenantInserted.Get(at).Add(samplesCount) } - rowsPerInsert.Update(float64(len(rows))) + rowsPerInsert.Update(float64(samplesCount)) return nil } diff --git a/app/vminsert/main.go b/app/vminsert/main.go index f2c0afe061edc..f14b90b698b49 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -222,19 +222,19 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusOK) return true - case "/newrelic/api/v1": + case "/newrelic": newrelicCheckRequest.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true - case "/newrelic/api/v1/inventory/deltas": + case "/newrelic/inventory/deltas": newrelicInventoryRequests.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) return true - case "/newrelic/api/v1/infra/v2/metrics/events/bulk": + case "/newrelic/infra/v2/metrics/events/bulk": newrelicWriteRequests.Inc() if err := newrelic.InsertHandlerForHTTP(r); err != nil { newrelicWriteErrors.Inc() @@ -382,11 +382,11 @@ var ( opentelemetryPushRequests = metrics.NewCounter(`vm_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) opentelemetryPushErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) - newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) - newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) - newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/inventory/deltas", protocol="newrelic"}`) - newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1", protocol="newrelic"}`) + newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/inventory/deltas", protocol="newrelic"}`) + newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic", protocol="newrelic"}`) promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`) promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vm_http_requests_total{path="/service-discovery"}`) diff --git a/app/vminsert/newrelic/request_handler.go b/app/vminsert/newrelic/request_handler.go index ae8530d56abcd..226e97075e53e 100644 --- a/app/vminsert/newrelic/request_handler.go +++ b/app/vminsert/newrelic/request_handler.go @@ -18,7 +18,7 @@ var ( rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="newrelic"}`) ) -// InsertHandlerForHTTP processes remote write for NewRelic POST /infra/v2/metrics/events/bulk request. +// InsertHandlerForHTTP processes remote write for request to /newrelic/infra/v2/metrics/events/bulk request. func InsertHandlerForHTTP(req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { @@ -26,42 +26,52 @@ func InsertHandlerForHTTP(req *http.Request) error { } ce := req.Header.Get("Content-Encoding") isGzip := ce == "gzip" - return stream.Parse(req.Body, isGzip, func(series []newrelic.Metric) error { - return insertRows(series, extraLabels) + return stream.Parse(req.Body, isGzip, func(rows []newrelic.Row) error { + return insertRows(rows, extraLabels) }) } -func insertRows(rows []newrelic.Metric, extraLabels []prompbmarshal.Label) error { +func insertRows(rows []newrelic.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetInsertCtx() defer common.PutInsertCtx(ctx) - ctx.Reset(len(rows)) + samplesCount := 0 + for i := range rows { + samplesCount += len(rows[i].Samples) + } + ctx.Reset(samplesCount) + hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] - ctx.Labels = ctx.Labels[:0] - ctx.AddLabel("", r.Metric) - for j := range r.Tags { - tag := &r.Tags[j] - ctx.AddLabel(tag.Key, tag.Value) - } - for j := range extraLabels { - label := &extraLabels[j] - ctx.AddLabel(label.Name, label.Value) - } - if hasRelabeling { - ctx.ApplyRelabeling() - } - if len(ctx.Labels) == 0 { - // Skip metric without labels. - continue - } - ctx.SortLabelsIfNeeded() - if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { - return err + samples := r.Samples + for j := range samples { + s := &samples[j] + + ctx.Labels = ctx.Labels[:0] + ctx.AddLabelBytes(nil, s.Name) + for k := range r.Tags { + t := &r.Tags[k] + ctx.AddLabelBytes(t.Key, t.Value) + } + for k := range extraLabels { + label := &extraLabels[k] + ctx.AddLabel(label.Name, label.Value) + } + if hasRelabeling { + ctx.ApplyRelabeling() + } + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } + ctx.SortLabelsIfNeeded() + if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, s.Value); err != nil { + return err + } } } - rowsInserted.Add(len(rows)) - rowsPerInsert.Update(float64(len(rows))) + rowsInserted.Add(samplesCount) + rowsPerInsert.Update(float64(samplesCount)) return ctx.FlushBufs() } diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 64cbcbd17b3a3..3e836f06db5d1 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -356,6 +356,7 @@ Check practical examples of VictoriaMetrics API [here](https://docs.victoriametr - `opentelemetry/api/v1/push` - for ingesting data via [OpenTelemetry protocol for metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry). - `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. - `influx/write` and `influx/api/v2/write` - for ingesting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. + - `newrelic/infra/v2/metrics/events/bulk` - for accepting data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-newrelic-agent) for details. - `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details. - URLs for [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): `http://:8481/select//prometheus/`, where: diff --git a/docs/README.md b/docs/README.md index 399d426ac965d..a889e61e48766 100644 --- a/docs/README.md +++ b/docs/README.md @@ -857,70 +857,75 @@ For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all ## How to send data from NewRelic agent VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) -at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. -NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) -which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). +at `/newrelic/infra/v2/metrics/events/bulk` HTTP path. +VictoriaMetrics receives [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +from NewRelic agent at the given path, transforms them to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) +according to [these docs](#newrelic-agent-data-mapping) before storing the raw samples to the database. -NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. -It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. +You need passing `COLLECTOR_URL` and `NRIA_LICENSE_KEY` environment variables to NewRelic infrastructure agent in order to send the collected metrics to VictoriaMetrics. +The `COLLECTOR_URL` must point to `/newrelic` HTTP endpoint at VictoriaMetrics, while the `NRIA_LICENSE_KEY` must contain NewRelic license key, +which can be obtained [here](https://newrelic.com/signup). +For example, if VictoriaMetrics runs at `localhost:8428`, then the following command can be used for running NewRelic infrastructure agent: -To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example: ```console -COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra +COLLECTOR_URL="http://localhost:8428/newrelic" NRIA_LICENSE_KEY="NEWRELIC_LICENSE_KEY" ./newrelic-infra ``` -### NewRelic agent data mapping +### NewRelic agent data mapping + +VictoriaMetrics maps [NewRelic Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in the following way: + +1. Every numeric field is converted into a raw sample with the corresponding name. +1. The `eventType` and all the other fields with `string` value type are attached to every raw sample as [metric labels](https://docs.victoriametrics.com/keyConcepts.html#labels). +1. The `timestamp` field is used as timestamp for the ingested [raw sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). + The `timestamp` field may be specified either in seconds or in milliseconds since the [Unix Epoch](https://en.wikipedia.org/wiki/Unix_time). + If the `timestamp` field is missing, then the raw sample is stored with the current timestamp. + +For example, let's import the following NewRelic Events request to VictoriaMetrics: -As example, lets create `newrelic.json` file with the following content: ```json [ - { - "Events":[ - { - "eventType":"SystemSample", - "entityKey":"macbook-pro.local", - "cpuPercent":25.056660790748904, - "cpuUserPercent":8.687987912389374, - "cpuSystemPercent":16.36867287835953, - "cpuIOWaitPercent":0, - "cpuIdlePercent":74.94333920925109, - "cpuStealPercent":0, - "loadAverageOneMinute":5.42333984375, - "loadAverageFiveMinute":4.099609375, - "loadAverageFifteenMinute":3.58203125 - } - ] - } - ] -``` - -Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: + { + "Events":[ + { + "eventType":"SystemSample", + "entityKey":"macbook-pro.local", + "cpuPercent":25.056660790748904, + "cpuUserPercent":8.687987912389374, + "cpuSystemPercent":16.36867287835953, + "cpuIOWaitPercent":0, + "cpuIdlePercent":74.94333920925109, + "cpuStealPercent":0, + "loadAverageOneMinute":5.42333984375, + "loadAverageFiveMinute":4.099609375, + "loadAverageFifteenMinute":3.58203125 + } + ] + } +] +``` + +Save this JSON into `newrelic.json` file and then use the following command in order to import it into VictoriaMetrics: ```console -curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk +curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/infra/v2/metrics/events/bulk ``` -If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics -in vmui via query `{__name__!=""}`: -```console -system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 -system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 -system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 -system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 -system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 -system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 -system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 -``` - -The fields in `newrelic.json` are transformed in the following way: -1. `eventType` filed is used as prefix for all metrics in the object; -2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object; -3. the rest fields with numeric values will be used as metrics; -4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted, -current time is used. +Let's fetch the ingested data via [data export API](#how-to-export-data-in-json-line-format): +```console +curl http://localhost:8428/api/v1/export -d 'match={eventType="SystemSample"}' +{"metric":{"__name__":"cpuStealPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFiveMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[4.099609375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIOWaitPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuSystemPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[16.368672878359],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageOneMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[5.42333984375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuUserPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[8.687987912389],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIdlePercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[74.9433392092],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFifteenMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[3.58203125],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[25.056660790748],"timestamps":[1697407970000]} +``` ## Prometheus querying API usage diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index d036f0ebc494d..d725de0a4a72d 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -865,70 +865,75 @@ For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all ## How to send data from NewRelic agent VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) -at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. -NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) -which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). +at `/newrelic/infra/v2/metrics/events/bulk` HTTP path. +VictoriaMetrics receives [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +from NewRelic agent at the given path, transforms them to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) +according to [these docs](#newrelic-agent-data-mapping) before storing the raw samples to the database. -NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. -It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. +You need passing `COLLECTOR_URL` and `NRIA_LICENSE_KEY` environment variables to NewRelic infrastructure agent in order to send the collected metrics to VictoriaMetrics. +The `COLLECTOR_URL` must point to `/newrelic` HTTP endpoint at VictoriaMetrics, while the `NRIA_LICENSE_KEY` must contain NewRelic license key, +which can be obtained [here](https://newrelic.com/signup). +For example, if VictoriaMetrics runs at `localhost:8428`, then the following command can be used for running NewRelic infrastructure agent: -To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example: ```console -COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra +COLLECTOR_URL="http://localhost:8428/newrelic" NRIA_LICENSE_KEY="NEWRELIC_LICENSE_KEY" ./newrelic-infra ``` -### NewRelic agent data mapping +### NewRelic agent data mapping + +VictoriaMetrics maps [NewRelic Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in the following way: + +1. Every numeric field is converted into a raw sample with the corresponding name. +1. The `eventType` and all the other fields with `string` value type are attached to every raw sample as [metric labels](https://docs.victoriametrics.com/keyConcepts.html#labels). +1. The `timestamp` field is used as timestamp for the ingested [raw sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). + The `timestamp` field may be specified either in seconds or in milliseconds since the [Unix Epoch](https://en.wikipedia.org/wiki/Unix_time). + If the `timestamp` field is missing, then the raw sample is stored with the current timestamp. + +For example, let's import the following NewRelic Events request to VictoriaMetrics: -As example, lets create `newrelic.json` file with the following content: ```json [ - { - "Events":[ - { - "eventType":"SystemSample", - "entityKey":"macbook-pro.local", - "cpuPercent":25.056660790748904, - "cpuUserPercent":8.687987912389374, - "cpuSystemPercent":16.36867287835953, - "cpuIOWaitPercent":0, - "cpuIdlePercent":74.94333920925109, - "cpuStealPercent":0, - "loadAverageOneMinute":5.42333984375, - "loadAverageFiveMinute":4.099609375, - "loadAverageFifteenMinute":3.58203125 - } - ] - } - ] -``` - -Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: + { + "Events":[ + { + "eventType":"SystemSample", + "entityKey":"macbook-pro.local", + "cpuPercent":25.056660790748904, + "cpuUserPercent":8.687987912389374, + "cpuSystemPercent":16.36867287835953, + "cpuIOWaitPercent":0, + "cpuIdlePercent":74.94333920925109, + "cpuStealPercent":0, + "loadAverageOneMinute":5.42333984375, + "loadAverageFiveMinute":4.099609375, + "loadAverageFifteenMinute":3.58203125 + } + ] + } +] +``` + +Save this JSON into `newrelic.json` file and then use the following command in order to import it into VictoriaMetrics: ```console -curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk +curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/infra/v2/metrics/events/bulk ``` -If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics -in vmui via query `{__name__!=""}`: -```console -system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 -system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 -system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 -system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 -system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 -system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 -system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 -``` - -The fields in `newrelic.json` are transformed in the following way: -1. `eventType` filed is used as prefix for all metrics in the object; -2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object; -3. the rest fields with numeric values will be used as metrics; -4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted, -current time is used. +Let's fetch the ingested data via [data export API](#how-to-export-data-in-json-line-format): +```console +curl http://localhost:8428/api/v1/export -d 'match={eventType="SystemSample"}' +{"metric":{"__name__":"cpuStealPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFiveMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[4.099609375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIOWaitPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuSystemPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[16.368672878359],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageOneMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[5.42333984375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuUserPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[8.687987912389],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIdlePercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[74.9433392092],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFifteenMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[3.58203125],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[25.056660790748],"timestamps":[1697407970000]} +``` ## Prometheus querying API usage diff --git a/lib/protoparser/datadog/stream/streamparser.go b/lib/protoparser/datadog/stream/streamparser.go index 4944396e16128..bf4693a2d71ef 100644 --- a/lib/protoparser/datadog/stream/streamparser.go +++ b/lib/protoparser/datadog/stream/streamparser.go @@ -138,11 +138,11 @@ func (ctx *pushCtx) Read() error { reqLen, err := ctx.reqBuf.ReadFrom(lr) if err != nil { readErrors.Inc() - return fmt.Errorf("cannot read compressed request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) + return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) } if reqLen > int64(maxInsertRequestSize.N) { readErrors.Inc() - return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) + return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", maxInsertRequestSize.N) } return nil } diff --git a/lib/protoparser/newrelic/parser.go b/lib/protoparser/newrelic/parser.go index e6901949faa8a..41706880cae60 100644 --- a/lib/protoparser/newrelic/parser.go +++ b/lib/protoparser/newrelic/parser.go @@ -2,231 +2,191 @@ package newrelic import ( "fmt" - "sync" - "unicode" "github.com/valyala/fastjson" "github.com/valyala/fastjson/fastfloat" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" ) -var baseEventKeys = map[string]struct{}{ - "timestamp": {}, "eventType": {}, +// Rows contains rows parsed from NewRelic Event request +// +// See https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events +type Rows struct { + Rows []Row } -type tagsBuffer struct { - tags []Tag +// Reset resets r, so it can be re-used +func (r *Rows) Reset() { + rows := r.Rows + for i := range rows { + rows[i].reset() + } + r.Rows = rows[:0] } -var tagsPool = sync.Pool{ - New: func() interface{} { - return &tagsBuffer{tags: make([]Tag, 0)} - }, -} +var jsonParserPool fastjson.ParserPool -// NewRelic agent sends next struct to the collector -// MetricPost entity item for the HTTP post to be sent to the ingest service. -// type MetricPost struct { -// ExternalKeys []string `json:"ExternalKeys,omitempty"` -// EntityID uint64 `json:"EntityID,omitempty"` -// IsAgent bool `json:"IsAgent"` -// Events []json.RawMessage `json:"Events"` -// // Entity ID of the reporting agent, which will = EntityID when IsAgent == true. -// // The field is required in the backend for host metadata matching of the remote entities -// ReportingAgentID uint64 `json:"ReportingAgentID,omitempty"` -// } -// We are using only Events field because it contains all needed metrics - -// Events represents Metrics collected from NewRelic MetricPost request -// https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events -type Events struct { - Metrics []Metric -} +// Unmarshal parses NewRelic Event request from b to r. +// +// b can be re-used after returning from r. +func (r *Rows) Unmarshal(b []byte) error { + p := jsonParserPool.Get() + defer jsonParserPool.Put(p) -// Unmarshal takes fastjson.Value and collects Metrics -func (e *Events) Unmarshal(v []*fastjson.Value) error { - for _, value := range v { - events := value.Get("Events") - if events == nil { - return fmt.Errorf("got empty Events array from request") - } - eventsArr, err := events.Array() + r.Reset() + v, err := p.ParseBytes(b) + if err != nil { + return err + } + metricPosts, err := v.Array() + if err != nil { + return fmt.Errorf("cannot find the top-level array of MetricPost objects: %w", err) + } + for _, mp := range metricPosts { + o, err := mp.Object() if err != nil { - return fmt.Errorf("error collect events: %s", err) + return fmt.Errorf("cannot find MetricPost object: %w", err) } - - for _, event := range eventsArr { - metricData, err := event.Object() + rows := r.Rows + o.Visit(func(k []byte, v *fastjson.Value) { if err != nil { - return fmt.Errorf("error get metric data: %s", err) + return } - var m Metric - metrics, err := m.unmarshal(metricData) - if err != nil { - return fmt.Errorf("error collect metrics from Newrelic json: %s", err) + switch string(k) { + case "Events": + events, errLocal := v.Array() + if errLocal != nil { + err = fmt.Errorf("cannot find Events array in MetricPost object: %w", errLocal) + return + } + for _, e := range events { + eventObject, errLocal := e.Object() + if errLocal != nil { + err = fmt.Errorf("cannot find EventObject: %w", errLocal) + return + } + if cap(rows) > len(rows) { + rows = rows[:len(rows)+1] + } else { + rows = append(rows, Row{}) + } + r := &rows[len(rows)-1] + if errLocal := r.unmarshal(eventObject); errLocal != nil { + err = fmt.Errorf("cannot unmarshal EventObject: %w", errLocal) + return + } + } } - e.Metrics = append(e.Metrics, metrics...) + }) + r.Rows = rows + if err != nil { + return fmt.Errorf("cannot parse MetricPost object: %w", err) } } - return nil } -// Metric represents VictoriaMetrics metrics -type Metric struct { - Timestamp int64 +// Row represents parsed row +type Row struct { Tags []Tag - Metric string - Value float64 + Samples []Sample + Timestamp int64 } -func (m *Metric) unmarshal(o *fastjson.Object) ([]Metric, error) { - m.reset() +// Tag represents a key=value tag +type Tag struct { + Key []byte + Value []byte +} - tgsBuffer := tagsPool.Get().(*tagsBuffer) - defer func() { - tgsBuffer.tags = tgsBuffer.tags[:0] - tagsPool.Put(tgsBuffer) - }() +// Sample represents parsed sample +type Sample struct { + Name []byte + Value float64 +} - metrics := make([]Metric, 0, o.Len()) - rawTs := o.Get("timestamp") - if rawTs != nil { - ts, err := getFloat64(rawTs) - if err != nil { - return nil, fmt.Errorf("invalid `timestamp` in %s: %w", o, err) - } - m.Timestamp = int64(ts * 1e3) - } else { - // Allow missing timestamp. It should be automatically populated - // with the current time by the caller. - m.Timestamp = 0 +func (r *Row) reset() { + tags := r.Tags + for i := range tags { + tags[i].reset() } + r.Tags = tags[:0] - eventType := o.Get("eventType") - if eventType == nil { - return nil, fmt.Errorf("error get eventType from Events object: %s", o) + samples := r.Samples + for i := range samples { + samples[i].reset() } - prefix := bytesutil.ToUnsafeString(eventType.GetStringBytes()) - prefix = camelToSnakeCase(prefix) + r.Samples = samples[:0] - o.Visit(func(key []byte, v *fastjson.Value) { + r.Timestamp = 0 +} + +func (t *Tag) reset() { + t.Key = t.Key[:0] + t.Value = t.Value[:0] +} - k := bytesutil.ToUnsafeString(key) - // skip base event keys which should have been parsed before this - if _, ok := baseEventKeys[k]; ok { +func (s *Sample) reset() { + s.Name = s.Name[:0] + s.Value = 0 +} + +func (r *Row) unmarshal(o *fastjson.Object) (err error) { + r.reset() + tags := r.Tags[:0] + samples := r.Samples[:0] + o.Visit(func(k []byte, v *fastjson.Value) { + if err != nil { + return + } + if len(k) == 0 { return } - switch v.Type() { case fastjson.TypeString: - // this is label-value pair - value := v.Get() - if value == nil { - logger.Errorf("failed to get label value from NewRelic json: %s", v) + // Register new tag + valueBytes := v.GetStringBytes() + if len(valueBytes) == 0 { return } - name := camelToSnakeCase(k) - val := bytesutil.ToUnsafeString(value.GetStringBytes()) - tgsBuffer.tags = append(tgsBuffer.tags, Tag{Key: name, Value: val}) - case fastjson.TypeNumber: - // this is metric name with value - metricName := camelToSnakeCase(k) - if prefix != "" { - metricName = fmt.Sprintf("%s_%s", prefix, metricName) + if cap(tags) > len(tags) { + tags = tags[:len(tags)+1] + } else { + tags = append(tags, Tag{}) } - f, err := getFloat64(v) - if err != nil { - logger.Errorf("failed to get value for NewRelic metric %q: %w", k, err) + t := &tags[len(tags)-1] + t.Key = append(t.Key[:0], k...) + t.Value = append(t.Value[:0], valueBytes...) + case fastjson.TypeNumber: + if string(k) == "timestamp" { + // Parse timestamp + ts, errLocal := getFloat64(v) + if errLocal != nil { + err = fmt.Errorf("cannot parse `timestamp` field: %w", errLocal) + return + } + if ts < (1 << 32) { + // The timestamp is in seconds. Convert it to milliseconds. + ts *= 1e3 + } + r.Timestamp = int64(ts) return } - metrics = append(metrics, Metric{Metric: metricName, Value: f}) - default: - // unknown type - logger.Errorf("got unsupported NewRelic json %s field type: %s", v, v.Type()) - return - } - }) - - for i := range metrics { - metrics[i].Timestamp = m.Timestamp - metrics[i].Tags = tgsBuffer.tags - } - - return metrics, nil -} - -func (m *Metric) reset() { - m.Timestamp = 0 - m.Tags = nil - m.Metric = "" - m.Value = 0 -} - -// Tag is an NewRelic tag. -type Tag struct { - Key string - Value string -} - -func camelToSnakeCase(str string) string { - str = promrelabel.SanitizeLabelName(str) - length := len(str) - snakeCase := make([]byte, 0, length*2) - tokens := make([]byte, 0, length) - var allTokensUpper bool - - flush := func(tokens []byte) { - for _, c := range tokens { - snakeCase = append(snakeCase, byte(unicode.ToLower(rune(c)))) - } - } - - for i := 0; i < length; i++ { - char := str[i] - if unicode.IsUpper(rune(char)) { - switch { - case len(tokens) == 0: - allTokensUpper = true - tokens = append(tokens, char) - case allTokensUpper: - tokens = append(tokens, char) - default: - flush(tokens) - snakeCase = append(snakeCase, '_') - tokens = tokens[:0] - tokens = append(tokens, char) - allTokensUpper = true + // Register new sample + if cap(samples) > len(samples) { + samples = samples[:len(samples)+1] + } else { + samples = append(samples, Sample{}) } - continue + s := &samples[len(samples)-1] + s.Name = append(s.Name[:0], k...) + s.Value = v.GetFloat64() } - - switch { - case len(tokens) == 1: - tokens = append(tokens, char) - allTokensUpper = false - case allTokensUpper: - tail := tokens[:len(tokens)-1] - last := tokens[len(tokens)-1:] - flush(tail) - snakeCase = append(snakeCase, '_') - tokens = tokens[:0] - tokens = append(tokens, last...) - tokens = append(tokens, char) - allTokensUpper = false - default: - tokens = append(tokens, char) - } - } - - if len(tokens) > 0 { - flush(tokens) - } - s := bytesutil.ToUnsafeString(snakeCase) - return s + }) + r.Tags = tags + r.Samples = samples + return err } func getFloat64(v *fastjson.Value) (float64, error) { diff --git a/lib/protoparser/newrelic/parser_test.go b/lib/protoparser/newrelic/parser_test.go index 2daded2dd823d..f3f3e499b490f 100644 --- a/lib/protoparser/newrelic/parser_test.go +++ b/lib/protoparser/newrelic/parser_test.go @@ -1,174 +1,229 @@ package newrelic import ( + "fmt" "reflect" "strings" "testing" - - "github.com/valyala/fastjson" ) -func TestEvents_Unmarshal(t *testing.T) { - tests := []struct { - name string - metrics []Metric - json string - wantErr bool - }{ - { - name: "empty json", - metrics: []Metric{}, - json: "", - wantErr: true, - }, - { - name: "json with correct data", - metrics: []Metric{ - { - Timestamp: 1690286061000, - Tags: []Tag{ - {Key: "entity_key", Value: "macbook-pro.local"}, - {Key: "dc", Value: "1"}, - }, - Metric: "system_sample_disk_writes_per_second", - Value: 0, - }, - { - Timestamp: 1690286061000, - Tags: []Tag{ - {Key: "entity_key", Value: "macbook-pro.local"}, - {Key: "dc", Value: "1"}, - }, - Metric: "system_sample_uptime", - Value: 762376, - }, - }, - json: `[ +func TestRowsUnmarshalFailure(t *testing.T) { + f := func(data string) { + t.Helper() + + var r Rows + if err := r.Unmarshal([]byte(data)); err == nil { + t.Fatalf("expecting non-nil error") + } + } + + // Empty JSON + f("") + + // Invalid JSON + f("123") + f("[foo]") + f(`{"foo":123}`) +} + +func TestRowsUnmarshalSuccess(t *testing.T) { + f := func(data string, expectedRows []Row) { + t.Helper() + + var r Rows + if err := r.Unmarshal([]byte(data)); err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !reflect.DeepEqual(r.Rows, expectedRows) { + t.Fatalf("unexpected rows parsed\ngot\n%s\nwant\n%s", rowsToString(r.Rows), rowsToString(expectedRows)) + } + } + + // empty array + f(`[]`, nil) + + // zero events + f(`[ { "EntityID":28257883748326179, "IsAgent":true, + "Events":[], + "ReportingAgentID":28257883748326179 + }]`, nil) + + // A single event + f(`[{ + "EntityID":28257883748326179, + "IsAgent":true, "Events":[ { "eventType":"SystemSample", "timestamp":1690286061, "entityKey":"macbook-pro.local", - "dc": "1", - "diskWritesPerSecond":0, + "dc": "1", + "diskWritesPerSecond":-34.21, "uptime":762376 } ], "ReportingAgentID":28257883748326179 - } - ]`, - wantErr: false, - }, + }]`, []Row{ { - name: "empty array in json", - metrics: []Metric{}, - json: `[]`, - wantErr: false, + Tags: []Tag{ + { + Key: []byte("eventType"), + Value: []byte("SystemSample"), + }, + { + Key: []byte("entityKey"), + Value: []byte("macbook-pro.local"), + }, + { + Key: []byte("dc"), + Value: []byte("1"), + }, + }, + Samples: []Sample{ + { + Name: []byte("diskWritesPerSecond"), + Value: -34.21, + }, + { + Name: []byte("uptime"), + Value: 762376, + }, + }, + Timestamp: 1690286061000, }, - { - name: "empty events in json", - metrics: []Metric{}, - json: `[ + }) + + // Multiple events + f(`[ { "EntityID":28257883748326179, "IsAgent":true, - "Events":[], + "Events":[ + { + "eventType":"SystemSample", + "timestamp":1690286061, + "entityKey":"macbook-pro.local", + "dc": "1", + "diskWritesPerSecond":-34.21, + "uptime":762376 + } + ], "ReportingAgentID":28257883748326179 + }, + { + "EntityID":282579, + "IsAgent":true, + "Events":[ + { + "eventType":"SystemSample", + "timestamp":1690286061, + "entityKey":"macbook-pro.local", + "diskWritesPerSecond":234.34, + "timestamp":1690286061.433, + "uptime":762376 + }, + { + "eventType":"ProcessSample", + "timestamp":1690286061987, + "uptime":1236 + } + ], + "ReportingAgentID":2879 } - ]`, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := &Events{Metrics: []Metric{}} - - value, err := fastjson.Parse(tt.json) - if (err != nil) != tt.wantErr { - t.Errorf("cannot parse json error: %s", err) - } - - if value != nil { - v, err := value.Array() - if err != nil { - t.Errorf("cannot get array from json") - } - if err := e.Unmarshal(v); (err != nil) != tt.wantErr { - t.Errorf("Unmarshal() error = %v, wantErr %v", err, tt.wantErr) - } - if !reflect.DeepEqual(e.Metrics, tt.metrics) { - t.Errorf("got metrics => %v; expected = %v", e.Metrics, tt.metrics) - } - } - }) - } -} - -func Test_camelToSnakeCase(t *testing.T) { - tests := []struct { - name string - str string - want string - }{ - { - name: "empty string", - str: "", - want: "", - }, + ]`, []Row{ { - name: "lowercase all chars", - str: "somenewstring", - want: "somenewstring", - }, - { - name: "first letter uppercase", - str: "Teststring", - want: "teststring", - }, - { - name: "two uppercase letters", - str: "TestString", - want: "test_string", - }, - { - name: "first and last uppercase letters", - str: "TeststrinG", - want: "teststrin_g", - }, - { - name: "three letters uppercase", - str: "TestStrinG", - want: "test_strin_g", + Tags: []Tag{ + { + Key: []byte("eventType"), + Value: []byte("SystemSample"), + }, + { + Key: []byte("entityKey"), + Value: []byte("macbook-pro.local"), + }, + { + Key: []byte("dc"), + Value: []byte("1"), + }, + }, + Samples: []Sample{ + { + Name: []byte("diskWritesPerSecond"), + Value: -34.21, + }, + { + Name: []byte("uptime"), + Value: 762376, + }, + }, + Timestamp: 1690286061000, }, { - name: "has many upper case letters", - str: "ProgressIOTime", - want: "progress_io_time", + Tags: []Tag{ + { + Key: []byte("eventType"), + Value: []byte("SystemSample"), + }, + { + Key: []byte("entityKey"), + Value: []byte("macbook-pro.local"), + }, + }, + Samples: []Sample{ + { + Name: []byte("diskWritesPerSecond"), + Value: 234.34, + }, + { + Name: []byte("uptime"), + Value: 762376, + }, + }, + Timestamp: 1690286061433, }, { - name: "last all uppercase letters", - str: "ProgressTSDB", - want: "progress_tsdb", + Tags: []Tag{ + { + Key: []byte("eventType"), + Value: []byte("ProcessSample"), + }, + }, + Samples: []Sample{ + { + Name: []byte("uptime"), + Value: 1236, + }, + }, + Timestamp: 1690286061987, }, + }) + +} + +func rowsToString(rows []Row) string { + var a []string + for _, row := range rows { + s := row.String() + a = append(a, s) } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := camelToSnakeCase(tt.str); got != tt.want { - t.Errorf("camelToSnakeCase() = %v, want %v", got, tt.want) - } - }) - } + return strings.Join(a, "\n") } -func BenchmarkCameToSnake(b *testing.B) { - b.ReportAllocs() - str := strings.Repeat("ProgressIOTime", 20) - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - camelToSnakeCase(str) - } - }) +func (r *Row) String() string { + var a []string + for _, t := range r.Tags { + s := fmt.Sprintf("%s=%q", t.Key, t.Value) + a = append(a, s) + } + tagsString := "{" + strings.Join(a, ",") + "}" + a = a[:0] + for _, sample := range r.Samples { + s := fmt.Sprintf("[%s %f]", sample.Name, sample.Value) + a = append(a, s) + } + samplesString := strings.Join(a, ",") + return fmt.Sprintf("tags=%s, samples=%s, timestamp=%d", tagsString, samplesString, r.Timestamp) } diff --git a/lib/protoparser/newrelic/parser_timing_test.go b/lib/protoparser/newrelic/parser_timing_test.go index 94e168a66d523..7a56412715112 100644 --- a/lib/protoparser/newrelic/parser_timing_test.go +++ b/lib/protoparser/newrelic/parser_timing_test.go @@ -1,13 +1,12 @@ package newrelic import ( + "fmt" "testing" - - "github.com/valyala/fastjson" ) -func BenchmarkRequestUnmarshal(b *testing.B) { - reqBody := `[ +func BenchmarkRowsUnmarshal(b *testing.B) { + reqBody := []byte(`[ { "EntityID":28257883748326179, "IsAgent":true, @@ -52,25 +51,17 @@ func BenchmarkRequestUnmarshal(b *testing.B) { ], "ReportingAgentID":28257883748326179 } - ]` + ]`) b.SetBytes(int64(len(reqBody))) b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { - value, err := fastjson.Parse(reqBody) - if err != nil { - b.Errorf("cannot parse json error: %s", err) - } - v, err := value.Array() - if err != nil { - b.Errorf("cannot get array from json") - } + var r Rows for pb.Next() { - e := &Events{Metrics: []Metric{}} - if err := e.Unmarshal(v); err != nil { - b.Errorf("Unmarshal() error = %v", err) + if err := r.Unmarshal(reqBody); err != nil { + panic(fmt.Errorf("unmarshal error: %s", err)) } - if len(e.Metrics) == 0 { - b.Errorf("metrics should have at least one element") + if len(r.Rows) != 1 { + panic(fmt.Errorf("unexpected number of items unmarshaled; got %d; want %d", len(r.Rows), 1)) } } }) diff --git a/lib/protoparser/newrelic/stream/push_context.go b/lib/protoparser/newrelic/stream/push_context.go deleted file mode 100644 index 4dbb7707cf4a4..0000000000000 --- a/lib/protoparser/newrelic/stream/push_context.go +++ /dev/null @@ -1,80 +0,0 @@ -package stream - -import ( - "bufio" - "fmt" - "io" - "sync" - - "github.com/VictoriaMetrics/metrics" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" -) - -var ( - maxInsertRequestSize = flagutil.NewBytes("newrelic.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single NewRelic POST request to /infra/v2/metrics/events/bulk") -) - -var ( - readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="newrelic"}`) - readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="newrelic"}`) - unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="newrelic"}`) -) - -var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) - -type pushCtx struct { - br *bufio.Reader - reqBuf bytesutil.ByteBuffer -} - -func (ctx *pushCtx) Read() error { - readCalls.Inc() - lr := io.LimitReader(ctx.br, maxInsertRequestSize.N+1) - startTime := fasttime.UnixTimestamp() - reqLen, err := ctx.reqBuf.ReadFrom(lr) - if err != nil { - readErrors.Inc() - return fmt.Errorf("cannot read compressed request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) - } - if reqLen > maxInsertRequestSize.N { - readErrors.Inc() - return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) - } - return nil -} - -func (ctx *pushCtx) reset() { - ctx.br.Reset(nil) - ctx.reqBuf.Reset() -} - -func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: - ctx.br.Reset(r) - return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } - } -} - -func putPushCtx(ctx *pushCtx) { - ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } -} diff --git a/lib/protoparser/newrelic/stream/streamparser.go b/lib/protoparser/newrelic/stream/streamparser.go index 8802124443496..ee28878295c6c 100644 --- a/lib/protoparser/newrelic/stream/streamparser.go +++ b/lib/protoparser/newrelic/stream/streamparser.go @@ -1,23 +1,31 @@ package stream import ( + "bufio" "fmt" "io" + "sync" - "github.com/valyala/fastjson" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" ) -var parserPool fastjson.ParserPool +var ( + maxInsertRequestSize = flagutil.NewBytes("newrelic.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single NewRelic request "+ + "to /newrelic/infra/v2/metrics/events/bulk") +) -// Parse parses NewRelic POST request for newrelic/infra/v2/metrics/events/bulk from reader and calls callback for the parsed request. +// Parse parses NewRelic POST request for /newrelic/infra/v2/metrics/events/bulk from r and calls callback for the parsed request. // -// callback shouldn't hold series after returning. -func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) error) error { +// callback shouldn't hold rows after returning. +func Parse(r io.Reader, isGzip bool, callback func(rows []newrelic.Row) error) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -25,7 +33,7 @@ func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) err if isGzip { zr, err := common.GetGzipReader(r) if err != nil { - return fmt.Errorf("cannot read gzipped Newrelic agent data: %w", err) + return fmt.Errorf("cannot read gzipped NewRelic agent data: %w", err) } defer common.PutGzipReader(zr) r = zr @@ -34,40 +42,104 @@ func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) err ctx := getPushCtx(r) defer putPushCtx(ctx) if err := ctx.Read(); err != nil { - return err + return fmt.Errorf("cannot read NewRelic request: %w", err) } - p := parserPool.Get() - defer parserPool.Put(p) + rows := getRows() + defer putRows(rows) - v, err := p.ParseBytes(ctx.reqBuf.B) - if err != nil { - return fmt.Errorf("cannot parse NewRelic POST request with size %d bytes: %w", len(ctx.reqBuf.B), err) - } - - metricsPost, err := v.Array() - if err != nil { - return fmt.Errorf("cannot fetch data from Newrelic POST request: %w", err) - } - - var events newrelic.Events - - if err := events.Unmarshal(metricsPost); err != nil { + if err := rows.Unmarshal(ctx.reqBuf.B); err != nil { unmarshalErrors.Inc() - return fmt.Errorf("cannot unmarshal NewRelic POST request: %w", err) + return fmt.Errorf("cannot unmarshal NewRelic request: %w", err) } // Fill in missing timestamps currentTimestamp := int64(fasttime.UnixTimestamp()) - for i := range events.Metrics { - m := &events.Metrics[i] - if m.Timestamp == 0 { - m.Timestamp = currentTimestamp * 1e3 + for i := range rows.Rows { + r := &rows.Rows[i] + if r.Timestamp == 0 { + r.Timestamp = currentTimestamp * 1e3 } } - if err := callback(events.Metrics); err != nil { + if err := callback(rows.Rows); err != nil { return fmt.Errorf("error when processing imported data: %w", err) } return nil } + +func getRows() *newrelic.Rows { + v := rowsPool.Get() + if v == nil { + return &newrelic.Rows{} + } + return v.(*newrelic.Rows) +} + +func putRows(rows *newrelic.Rows) { + rows.Reset() + rowsPool.Put(rows) +} + +var rowsPool sync.Pool + +var ( + readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="newrelic"}`) + readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="newrelic"}`) + unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="newrelic"}`) +) + +var pushCtxPool sync.Pool +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) + +type pushCtx struct { + br *bufio.Reader + reqBuf bytesutil.ByteBuffer +} + +func (ctx *pushCtx) Read() error { + readCalls.Inc() + lr := io.LimitReader(ctx.br, maxInsertRequestSize.N+1) + startTime := fasttime.UnixTimestamp() + reqLen, err := ctx.reqBuf.ReadFrom(lr) + if err != nil { + readErrors.Inc() + return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) + } + if reqLen > maxInsertRequestSize.N { + readErrors.Inc() + return fmt.Errorf("too big request; mustn't exceed -newrelic.maxInsertRequestSize=%d bytes", maxInsertRequestSize.N) + } + return nil +} + +func (ctx *pushCtx) reset() { + ctx.br.Reset(nil) + ctx.reqBuf.Reset() +} + +func getPushCtx(r io.Reader) *pushCtx { + select { + case ctx := <-pushCtxPoolCh: + ctx.br.Reset(r) + return ctx + default: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) + ctx.br.Reset(r) + return ctx + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), + } + } +} + +func putPushCtx(ctx *pushCtx) { + ctx.reset() + select { + case pushCtxPoolCh <- ctx: + default: + pushCtxPool.Put(ctx) + } +} diff --git a/lib/protoparser/newrelic/stream/streamparser_test.go b/lib/protoparser/newrelic/stream/streamparser_test.go new file mode 100644 index 0000000000000..4f9b3ab97c38e --- /dev/null +++ b/lib/protoparser/newrelic/stream/streamparser_test.go @@ -0,0 +1,106 @@ +package stream + +import ( + "bytes" + "compress/gzip" + "fmt" + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" +) + +func TestParseFailure(t *testing.T) { + f := func(req string) { + t.Helper() + + callback := func(rows []newrelic.Row) error { + panic(fmt.Errorf("unexpected call into callback")) + } + r := bytes.NewReader([]byte(req)) + if err := Parse(r, false, callback); err == nil { + t.Fatalf("expecting non-empty error") + } + } + f("") + f("foo") + f("{}") + f("[1,2,3]") +} + +func TestParseSuccess(t *testing.T) { + f := func(req string, expectedRows []newrelic.Row) { + t.Helper() + + callback := func(rows []newrelic.Row) error { + if !reflect.DeepEqual(rows, expectedRows) { + return fmt.Errorf("unexpected rows\ngot\n%v\nwant\n%v", rows, expectedRows) + } + return nil + } + + // Parse from uncompressed reader + r := bytes.NewReader([]byte(req)) + if err := Parse(r, false, callback); err != nil { + t.Fatalf("unexpected error when parsing uncompressed request: %s", err) + } + + var bb bytes.Buffer + zw := gzip.NewWriter(&bb) + if _, err := zw.Write([]byte(req)); err != nil { + t.Fatalf("cannot compress request: %s", err) + } + if err := zw.Close(); err != nil { + t.Fatalf("cannot close compressed writer: %s", err) + } + if err := Parse(&bb, true, callback); err != nil { + t.Fatalf("unexpected error when parsing compressed request: %s", err) + } + } + + f("[]", nil) + f(`[{"Events":[]}]`, nil) + f(`[{ + "EntityID":28257883748326179, + "IsAgent":true, + "Events":[ + { + "eventType":"SystemSample", + "timestamp":1690286061, + "entityKey":"macbook-pro.local", + "dc": "1", + "diskWritesPerSecond":-34.21, + "uptime":762376 + } + ], + "ReportingAgentID":28257883748326179 +}]`, []newrelic.Row{ + { + Tags: []newrelic.Tag{ + { + Key: []byte("eventType"), + Value: []byte("SystemSample"), + }, + { + Key: []byte("entityKey"), + Value: []byte("macbook-pro.local"), + }, + { + Key: []byte("dc"), + Value: []byte("1"), + }, + }, + Samples: []newrelic.Sample{ + { + Name: []byte("diskWritesPerSecond"), + Value: -34.21, + }, + { + Name: []byte("uptime"), + Value: 762376, + }, + }, + Timestamp: 1690286061000, + }, + }) +} diff --git a/lib/protoparser/promremotewrite/stream/streamparser.go b/lib/protoparser/promremotewrite/stream/streamparser.go index ee01b76818c9c..4b751b1ad5cc9 100644 --- a/lib/protoparser/promremotewrite/stream/streamparser.go +++ b/lib/protoparser/promremotewrite/stream/streamparser.go @@ -96,7 +96,7 @@ func (ctx *pushCtx) Read() error { } if reqLen > int64(maxInsertRequestSize.N) { readErrors.Inc() - return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) + return fmt.Errorf("too big packed request; mustn't exceed -maxInsertRequestSize=%d bytes", maxInsertRequestSize.N) } return nil }