Skip to content

Commit

Permalink
app/(vminsert|vmagent): add support for new relic infrastructure agent (
Browse files Browse the repository at this point in the history
VictoriaMetrics#4712)

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
  • Loading branch information
dmitryk-dk and hagen1778 committed Oct 5, 2023
1 parent cc7d5b7 commit f60c08a
Show file tree
Hide file tree
Showing 13 changed files with 1,086 additions and 1 deletion.
68 changes: 68 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,74 @@ The `/api/v1/export` endpoint should return the following response:
Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args.
For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.

## How to send data from NewRelic agent

VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent)
at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path.
NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events)
which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format).

NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`.
It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud.

To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example:
```console
COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra
```

### NewRelic agent data mapping

As example, lets create `newrelic.json` file with the following content:
```json
[
{
"Events":[
{
"eventType":"SystemSample",
"entityKey":"macbook-pro.local",
"cpuPercent":25.056660790748904,
"cpuUserPercent":8.687987912389374,
"cpuSystemPercent":16.36867287835953,
"cpuIOWaitPercent":0,
"cpuIdlePercent":74.94333920925109,
"cpuStealPercent":0,
"loadAverageOneMinute":5.42333984375,
"loadAverageFiveMinute":4.099609375,
"loadAverageFifteenMinute":3.58203125
}
]
}
]
```

Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics:

```console
curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk
```

If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics
in vmui via query `{__name__!=""}`:
```console
system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0
system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092
system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748
system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0
system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359
system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389
system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125
system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375
system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375
```

The fields in `newrelic.json` are transformed in the following way:
1. `eventType` filed is used as prefix for all metrics in the object;
2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object;
3. the rest fields with numeric values will be used as metrics;
4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted,
current time is used.


## Prometheus querying API usage

VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):
Expand Down
53 changes: 53 additions & 0 deletions app/vmagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/newrelic"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentelemetry"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp"
Expand Down Expand Up @@ -319,6 +320,29 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
}
w.WriteHeader(http.StatusOK)
return true
case "/newrelic/api/v1":
newrelicCheckRequest.Inc()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/newrelic/api/v1/inventory/deltas":
newrelicInventoryRequests.Inc()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`)
return true
case "/newrelic/api/v1/infra/v2/metrics/events/bulk":
newrelicWriteRequests.Inc()
if err := newrelic.InsertHandlerForHTTP(nil, r); err != nil {
newrelicWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/api/v1/series":
datadogWriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(nil, r); err != nil {
Expand Down Expand Up @@ -519,6 +543,29 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
}
w.WriteHeader(http.StatusOK)
return true
case "/newrelic/api/v1":
newrelicCheckRequest.Inc()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/newrelic/api/v1/inventory/deltas":
newrelicInventoryRequests.Inc()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`)
return true
case "/newrelic/api/v1/infra/v2/metrics/events/bulk":
newrelicWriteRequests.Inc()
if err := newrelic.InsertHandlerForHTTP(at, r); err != nil {
newrelicWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/api/v1/series":
datadogWriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
Expand Down Expand Up @@ -591,6 +638,12 @@ var (
opentelemetryPushRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
opentelemetryPushErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)

newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`)
newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`)

newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/newrelic/inventory/deltas", protocol="newrelic"}`)
newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/newrelic", protocol="newrelic"}`)

promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`)
promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/service-discovery"}`)

Expand Down
79 changes: 79 additions & 0 deletions app/vmagent/newrelic/request_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package newrelic

import (
"net/http"

"github.com/VictoriaMetrics/metrics"

"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
)

var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="newrelic"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="newrelic"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="newrelic"}`)
)

// InsertHandlerForHTTP processes remote write for NewRelic POST /infra/v2/metrics/events/bulk request.
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
ce := req.Header.Get("Content-Encoding")
isGzip := ce == "gzip"
return stream.Parse(req.Body, isGzip, func(series []newrelic.Metric) error {
return insertRows(at, series, extraLabels)
})
}

func insertRows(at *auth.Token, rows []newrelic.Metric, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)

rowsTotal := 0
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
for i := range rows {
r := &rows[i]
labelsLen := len(labels)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: r.Metric,
})
for j := range r.Tags {
tag := &r.Tags[j]
labels = append(labels, prompbmarshal.Label{
Name: tag.Key,
Value: tag.Value,
})
}
samples = append(samples, prompbmarshal.Sample{
Value: r.Value,
Timestamp: r.Timestamp,
})
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[len(samples)-1:],
})
labels = append(labels, extraLabels...)
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest)
rowsInserted.Add(len(rows))
if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
}
rowsPerInsert.Update(float64(len(rows)))
return nil
}
33 changes: 32 additions & 1 deletion app/vminsert/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (
"sync/atomic"
"time"

"github.com/VictoriaMetrics/metrics"

vminsertCommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/newrelic"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentelemetry"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
Expand All @@ -36,7 +39,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)

var (
Expand Down Expand Up @@ -220,6 +222,29 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
}
w.WriteHeader(http.StatusOK)
return true
case "/newrelic/api/v1":
newrelicCheckRequest.Inc()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/newrelic/api/v1/inventory/deltas":
newrelicInventoryRequests.Inc()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`)
return true
case "/newrelic/api/v1/infra/v2/metrics/events/bulk":
newrelicWriteRequests.Inc()
if err := newrelic.InsertHandlerForHTTP(r); err != nil {
newrelicWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/api/v1/series":
datadogWriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(r); err != nil {
Expand Down Expand Up @@ -357,6 +382,12 @@ var (
opentelemetryPushRequests = metrics.NewCounter(`vm_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
opentelemetryPushErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)

newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`)
newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`)

newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/newrelic/inventory/deltas", protocol="newrelic"}`)
newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/newrelic", protocol="newrelic"}`)

promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`)
promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vm_http_requests_total{path="/service-discovery"}`)

Expand Down
67 changes: 67 additions & 0 deletions app/vminsert/newrelic/request_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package newrelic

import (
"net/http"

"github.com/VictoriaMetrics/metrics"

"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic/stream"
)

var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="newrelic"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="newrelic"}`)
)

// InsertHandlerForHTTP processes remote write for NewRelic POST /infra/v2/metrics/events/bulk request.
func InsertHandlerForHTTP(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
ce := req.Header.Get("Content-Encoding")
isGzip := ce == "gzip"
return stream.Parse(req.Body, isGzip, func(series []newrelic.Metric) error {
return insertRows(series, extraLabels)
})
}

func insertRows(rows []newrelic.Metric, extraLabels []prompbmarshal.Label) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)

ctx.Reset(len(rows))
hasRelabeling := relabel.HasRelabeling()
for i := range rows {
r := &rows[i]
ctx.Labels = ctx.Labels[:0]
ctx.AddLabel("", r.Metric)
for j := range r.Tags {
tag := &r.Tags[j]
ctx.AddLabel(tag.Key, tag.Value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}
}
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return ctx.FlushBufs()
}
1 change: 1 addition & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The sandbox cluster installation is running under the constant load generated by

## tip

* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support data ingestion from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-newrelic-agent), [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712).
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-filestream.disableFadvise` command-line flag, which can be used for disabling `fadvise` syscall during backup upload to the remote storage. By default `vmbackup` uses `fadvise` syscall in order to prevent from eviction of recently accessed data from the [OS page cache](https://en.wikipedia.org/wiki/Page_cache) when backing up large files. Sometimes the `fadvise` syscall may take significant amounts of CPU when the backup is performed with large value of `-concurrency` command-line flag on systems with big number of CPU cores. In this case it is better to manually disable `fadvise` syscall by passing `-filestream.disableFadvise` command-line flag to `vmbackup`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5120) for details.
* FEATURE: [Alerting rules for VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#alerts): account for `vmauth` component for alerts `ServiceDown` and `TooManyRestarts`.

Expand Down
Loading

0 comments on commit f60c08a

Please sign in to comment.