Skip to content

Commit

Permalink
app/{vmagent,vminsert}: follow-up for NewRelic data ingestion protoco…
Browse files Browse the repository at this point in the history
…l support

This is a follow-up for f60c08a

Changes:

- Make sure all the urls related to NewRelic protocol start from /newrelic . Previously some urls were started from /api/v1/newrelic

- Remove /api/v1 part from NewRelic urls, since it has no sense

- Remove automatic transformation from CamelCase to snake_case for NewRelic labels and metric names,
  since it may complicate the transition from NewRelic to VictoriaMetrics. Preserve all the metric names and label names,
  so users could query metrics and labels by the same names which are used in NewRelic.
  The automatic transformation from CamelCase to snake_case can be added later as a special action for relabeling rules if needed.

- Properly update per-tenant data ingestion stats at app/vmagent/newrelic/request_handler.go . Previously it was always zero.

- Fix NewRelic urls in vmagent when multitenant data ingestion is enabled. Previously they were mistakenly started from `/`.

- Document NewRelic data ingestion url at docs/Cluster-VictoriaMetrics.md

- Remove superflouos memory allocations at lib/protoparser/newrelic

- Improve tests at lib/protoparser/newrelic/*

Updates VictoriaMetrics#3520
Updates VictoriaMetrics#4712
  • Loading branch information
valyala authored and Andrew Chubatiuk committed Nov 15, 2023
1 parent 54cf6bf commit 1538764
Show file tree
Hide file tree
Showing 16 changed files with 805 additions and 668 deletions.
107 changes: 56 additions & 51 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -854,70 +854,75 @@ For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all
## How to send data from NewRelic agent

VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent)
at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path.
NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events)
which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format).
at `/newrelic/infra/v2/metrics/events/bulk` HTTP path.
VictoriaMetrics receives [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events)
from NewRelic agent at the given path, transforms them to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples)
according to [these docs](#newrelic-agent-data-mapping) before storing the raw samples to the database.

NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`.
It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud.
You need passing `COLLECTOR_URL` and `NRIA_LICENSE_KEY` environment variables to NewRelic infrastructure agent in order to send the collected metrics to VictoriaMetrics.
The `COLLECTOR_URL` must point to `/newrelic` HTTP endpoint at VictoriaMetrics, while the `NRIA_LICENSE_KEY` must contain NewRelic license key,
which can be obtained [here](https://newrelic.com/signup).
For example, if VictoriaMetrics runs at `localhost:8428`, then the following command can be used for running NewRelic infrastructure agent:

To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example:
```console
COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra
COLLECTOR_URL="http://localhost:8428/newrelic" NRIA_LICENSE_KEY="NEWRELIC_LICENSE_KEY" ./newrelic-infra
```

### NewRelic agent data mapping
### NewRelic agent data mapping

VictoriaMetrics maps [NewRelic Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events)
to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in the following way:

1. Every numeric field is converted into a raw sample with the corresponding name.
1. The `eventType` and all the other fields with `string` value type are attached to every raw sample as [metric labels](https://docs.victoriametrics.com/keyConcepts.html#labels).
1. The `timestamp` field is used as timestamp for the ingested [raw sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
The `timestamp` field may be specified either in seconds or in milliseconds since the [Unix Epoch](https://en.wikipedia.org/wiki/Unix_time).
If the `timestamp` field is missing, then the raw sample is stored with the current timestamp.

For example, let's import the following NewRelic Events request to VictoriaMetrics:

As example, lets create `newrelic.json` file with the following content:
```json
[
{
"Events":[
{
"eventType":"SystemSample",
"entityKey":"macbook-pro.local",
"cpuPercent":25.056660790748904,
"cpuUserPercent":8.687987912389374,
"cpuSystemPercent":16.36867287835953,
"cpuIOWaitPercent":0,
"cpuIdlePercent":74.94333920925109,
"cpuStealPercent":0,
"loadAverageOneMinute":5.42333984375,
"loadAverageFiveMinute":4.099609375,
"loadAverageFifteenMinute":3.58203125
}
]
}
]
```

Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics:
{
"Events":[
{
"eventType":"SystemSample",
"entityKey":"macbook-pro.local",
"cpuPercent":25.056660790748904,
"cpuUserPercent":8.687987912389374,
"cpuSystemPercent":16.36867287835953,
"cpuIOWaitPercent":0,
"cpuIdlePercent":74.94333920925109,
"cpuStealPercent":0,
"loadAverageOneMinute":5.42333984375,
"loadAverageFiveMinute":4.099609375,
"loadAverageFifteenMinute":3.58203125
}
]
}
]
```

Save this JSON into `newrelic.json` file and then use the following command in order to import it into VictoriaMetrics:

```console
curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk
curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/infra/v2/metrics/events/bulk
```

If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics
in vmui via query `{__name__!=""}`:
```console
system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0
system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092
system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748
system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0
system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359
system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389
system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125
system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375
system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375
```

The fields in `newrelic.json` are transformed in the following way:
1. `eventType` filed is used as prefix for all metrics in the object;
2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object;
3. the rest fields with numeric values will be used as metrics;
4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted,
current time is used.
Let's fetch the ingested data via [data export API](#how-to-export-data-in-json-line-format):

```console
curl http://localhost:8428/api/v1/export -d 'match={eventType="SystemSample"}'
{"metric":{"__name__":"cpuStealPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]}
{"metric":{"__name__":"loadAverageFiveMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[4.099609375],"timestamps":[1697407970000]}
{"metric":{"__name__":"cpuIOWaitPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]}
{"metric":{"__name__":"cpuSystemPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[16.368672878359],"timestamps":[1697407970000]}
{"metric":{"__name__":"loadAverageOneMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[5.42333984375],"timestamps":[1697407970000]}
{"metric":{"__name__":"cpuUserPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[8.687987912389],"timestamps":[1697407970000]}
{"metric":{"__name__":"cpuIdlePercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[74.9433392092],"timestamps":[1697407970000]}
{"metric":{"__name__":"loadAverageFifteenMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[3.58203125],"timestamps":[1697407970000]}
{"metric":{"__name__":"cpuPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[25.056660790748],"timestamps":[1697407970000]}
```

## Prometheus querying API usage

Expand Down
20 changes: 10 additions & 10 deletions app/vmagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,19 +320,19 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
}
w.WriteHeader(http.StatusOK)
return true
case "/newrelic/api/v1":
case "/newrelic":
newrelicCheckRequest.Inc()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/newrelic/api/v1/inventory/deltas":
case "/newrelic/inventory/deltas":
newrelicInventoryRequests.Inc()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`)
return true
case "/newrelic/api/v1/infra/v2/metrics/events/bulk":
case "/newrelic/infra/v2/metrics/events/bulk":
newrelicWriteRequests.Inc()
if err := newrelic.InsertHandlerForHTTP(nil, r); err != nil {
newrelicWriteErrors.Inc()
Expand Down Expand Up @@ -543,19 +543,19 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
}
w.WriteHeader(http.StatusOK)
return true
case "/newrelic/api/v1":
case "newrelic":
newrelicCheckRequest.Inc()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/newrelic/api/v1/inventory/deltas":
case "newrelic/inventory/deltas":
newrelicInventoryRequests.Inc()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`)
return true
case "/newrelic/api/v1/infra/v2/metrics/events/bulk":
case "newrelic/infra/v2/metrics/events/bulk":
newrelicWriteRequests.Inc()
if err := newrelic.InsertHandlerForHTTP(at, r); err != nil {
newrelicWriteErrors.Inc()
Expand Down Expand Up @@ -638,11 +638,11 @@ var (
opentelemetryPushRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
opentelemetryPushErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)

newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`)
newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`)
newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`)
newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`)

newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/inventory/deltas", protocol="newrelic"}`)
newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1", protocol="newrelic"}`)
newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/inventory/deltas", protocol="newrelic"}`)
newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic", protocol="newrelic"}`)

promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`)
promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/service-discovery"}`)
Expand Down
55 changes: 31 additions & 24 deletions app/vmagent/newrelic/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic"
Expand All @@ -29,51 +30,57 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
}
ce := req.Header.Get("Content-Encoding")
isGzip := ce == "gzip"
return stream.Parse(req.Body, isGzip, func(series []newrelic.Metric) error {
return insertRows(at, series, extraLabels)
return stream.Parse(req.Body, isGzip, func(rows []newrelic.Row) error {
return insertRows(at, rows, extraLabels)
})
}

func insertRows(at *auth.Token, rows []newrelic.Metric, extraLabels []prompbmarshal.Label) error {
func insertRows(at *auth.Token, rows []newrelic.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)

rowsTotal := 0
samplesCount := 0
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
for i := range rows {
r := &rows[i]
labelsLen := len(labels)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: r.Metric,
})
for j := range r.Tags {
tag := &r.Tags[j]
tags := r.Tags
srcSamples := r.Samples
for j := range srcSamples {
s := &srcSamples[j]
labelsLen := len(labels)
labels = append(labels, prompbmarshal.Label{
Name: tag.Key,
Value: tag.Value,
Name: "__name__",
Value: bytesutil.ToUnsafeString(s.Name),
})
for k := range tags {
t := &tags[k]
labels = append(labels, prompbmarshal.Label{
Name: bytesutil.ToUnsafeString(t.Key),
Value: bytesutil.ToUnsafeString(t.Value),
})
}
samples = append(samples, prompbmarshal.Sample{
Value: s.Value,
Timestamp: r.Timestamp,
})
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[len(samples)-1:],
})
labels = append(labels, extraLabels...)
}
samples = append(samples, prompbmarshal.Sample{
Value: r.Value,
Timestamp: r.Timestamp,
})
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[len(samples)-1:],
})
labels = append(labels, extraLabels...)
samplesCount += len(srcSamples)
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest)
rowsInserted.Add(len(rows))
if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
rowsTenantInserted.Get(at).Add(samplesCount)
}
rowsPerInsert.Update(float64(len(rows)))
rowsPerInsert.Update(float64(samplesCount))
return nil
}
14 changes: 7 additions & 7 deletions app/vminsert/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,19 +222,19 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
}
w.WriteHeader(http.StatusOK)
return true
case "/newrelic/api/v1":
case "/newrelic":
newrelicCheckRequest.Inc()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/newrelic/api/v1/inventory/deltas":
case "/newrelic/inventory/deltas":
newrelicInventoryRequests.Inc()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`)
return true
case "/newrelic/api/v1/infra/v2/metrics/events/bulk":
case "/newrelic/infra/v2/metrics/events/bulk":
newrelicWriteRequests.Inc()
if err := newrelic.InsertHandlerForHTTP(r); err != nil {
newrelicWriteErrors.Inc()
Expand Down Expand Up @@ -382,11 +382,11 @@ var (
opentelemetryPushRequests = metrics.NewCounter(`vm_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
opentelemetryPushErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)

newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`)
newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`)
newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`)
newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`)

newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/inventory/deltas", protocol="newrelic"}`)
newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1", protocol="newrelic"}`)
newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/inventory/deltas", protocol="newrelic"}`)
newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic", protocol="newrelic"}`)

promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`)
promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vm_http_requests_total{path="/service-discovery"}`)
Expand Down

0 comments on commit 1538764

Please sign in to comment.