Skip to content

Commit

Permalink
opentelemetry: added cmd flag to sanitize metric names (#6035)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewChubatiuk committed Mar 29, 2024
1 parent 166b97b commit 47892b4
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 21 deletions.
2 changes: 2 additions & 0 deletions docs/CHANGELOG.md
Expand Up @@ -59,6 +59,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for VictoriaMetrics destination specified via `--vm-*` cmd-line flags used in [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x), [Remote Read protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-by-remote-read-protocol), [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb), [Prometheus](https://docs.victoriametrics.com/vmctl/#migrating-data-from-prometheus) and [Promscale](https://docs.victoriametrics.com/vmctl/#migrating-data-from-promscale) migration modes.
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): split [explore phase](https://docs.victoriametrics.com/vmctl/#migrating-data-from-victoriametrics) in `vm-native` mode by time intervals when [--vm-native-step-interval](https://docs.victoriametrics.com/vmctl/#using-time-based-chunking-of-migration) is specified. This should reduce probability of exceeding complexity limits for number of selected series during explore phase. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5369).
* FEATURE: [graphite](https://docs.victoriametrics.com/#graphite-render-api-usage): add support for `aggregateSeriesLists`, `diffSeriesLists`, `multiplySeriesLists` and `sumSeriesLists` functions. Thanks to @rbizos for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5809).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): added command line argument that enables OpenTelementry metric names and labels sanitization.


* BUGFIX: prevent from automatic deletion of newly registered time series when it is queried immediately after the addition. The probability of this bug has been increased significantly after [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) because of optimizations related to registering new time series. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5948) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5959) issue.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970).
Expand Down
3 changes: 3 additions & 0 deletions docs/Single-server-VictoriaMetrics.md
Expand Up @@ -1557,6 +1557,9 @@ VictoriaMetrics supports data ingestion via [OpenTelemetry protocol for metrics]
VictoriaMetrics expects `protobuf`-encoded requests at `/opentelemetry/v1/metrics`.
Set HTTP request header `Content-Encoding: gzip` when sending gzip-compressed data to `/opentelemetry/v1/metrics`.

VictoriaMetrics automatically does not sanitize metric names for the data ingested via OpenTelemetry protocol
If you need accepting metric and label names as is with sanitizing, then pass `-opentelemetry.sanitizeMetrics=true` command-line flag to VictoriaMetrics.

See [How to use OpenTelemetry metrics with VictoriaMetrics](https://docs.victoriametrics.com/guides/getting-started-with-opentelemetry/).

## JSON line format
Expand Down
5 changes: 5 additions & 0 deletions lib/promrelabel/relabel.go
Expand Up @@ -663,6 +663,11 @@ func SanitizeLabelName(name string) string {
return labelNameSanitizer.Transform(name)
}

// SanitizeLabelNameParts returns label name slice generated from metric name divided by unsupported characters
func SanitizeLabelNameParts(name string) []string {
return unsupportedLabelNameChars.Split(name, -1)
}

var labelNameSanitizer = bytesutil.NewFastStringTransformer(func(s string) string {
return unsupportedLabelNameChars.ReplaceAllString(s, "_")
})
Expand Down
17 changes: 17 additions & 0 deletions lib/protoparser/opentelemetry/pb/pb.go
Expand Up @@ -190,6 +190,7 @@ func (sm *ScopeMetrics) unmarshalProtobuf(src []byte) (err error) {
// Metric represents the corresponding OTEL protobuf message
type Metric struct {
Name string
Unit string
Gauge *Gauge
Sum *Sum
Histogram *Histogram
Expand All @@ -198,6 +199,7 @@ type Metric struct {

func (m *Metric) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, m.Name)
mm.AppendString(3, m.Unit)
switch {
case m.Gauge != nil:
m.Gauge.marshalProtobuf(mm.AppendMessage(5))
Expand All @@ -213,6 +215,7 @@ func (m *Metric) marshalProtobuf(mm *easyproto.MessageMarshaler) {
func (m *Metric) unmarshalProtobuf(src []byte) (err error) {
// message Metric {
// string name = 1;
// string unit = 3;
// oneof data {
// Gauge gauge = 5;
// Sum sum = 7;
Expand All @@ -233,6 +236,12 @@ func (m *Metric) unmarshalProtobuf(src []byte) (err error) {
return fmt.Errorf("cannot read metric name")
}
m.Name = strings.Clone(name)
case 3:
unit, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read metric unit")
}
m.Unit = strings.Clone(unit)
case 5:
data, ok := fc.MessageData()
if !ok {
Expand Down Expand Up @@ -617,6 +626,7 @@ func (ndp *NumberDataPoint) unmarshalProtobuf(src []byte) (err error) {
type Sum struct {
DataPoints []*NumberDataPoint
AggregationTemporality AggregationTemporality
IsMonotonic bool
}

// AggregationTemporality represents the corresponding OTEL protobuf enum
Expand All @@ -636,6 +646,7 @@ func (s *Sum) marshalProtobuf(mm *easyproto.MessageMarshaler) {
dp.marshalProtobuf(mm.AppendMessage(1))
}
mm.AppendInt64(2, int64(s.AggregationTemporality))
mm.AppendBool(3, s.IsMonotonic)
}

func (s *Sum) unmarshalProtobuf(src []byte) (err error) {
Expand Down Expand Up @@ -666,6 +677,12 @@ func (s *Sum) unmarshalProtobuf(src []byte) (err error) {
return fmt.Errorf("cannot read AggregationTemporality")
}
s.AggregationTemporality = AggregationTemporality(at)
case 3:
im, ok := fc.Bool()
if !ok {
return fmt.Errorf("cannot read IsMonotonic")
}
s.IsMonotonic = im
}
}
return nil
Expand Down
145 changes: 139 additions & 6 deletions lib/protoparser/opentelemetry/stream/streamparser.go
@@ -1,10 +1,13 @@
package stream

import (
"flag"
"fmt"
"io"
"strconv"
"strings"
"sync"
"unicode"

"github.com/VictoriaMetrics/metrics"

Expand All @@ -13,11 +16,72 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
)

var (
// sanitizeMetrics controls sanitizing metric and label names ingested via OpenTelemetry protocol.
sanitizeMetrics = flag.Bool("opentelemetry.sanitizeMetrics", false, "Sanitize metric and label names for the ingested OpenTelemetry data")
)

// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_name.go#L19
var unitMap = []struct {
prefix string
units map[string]string
}{
{
units: map[string]string{
// Time
"d": "days",
"h": "hours",
"min": "minutes",
"s": "seconds",
"ms": "milliseconds",
"us": "microseconds",
"ns": "nanoseconds",

// Bytes
"By": "bytes",
"KiBy": "kibibytes",
"MiBy": "mebibytes",
"GiBy": "gibibytes",
"TiBy": "tibibytes",
"KBy": "kilobytes",
"MBy": "megabytes",
"GBy": "gigabytes",
"TBy": "terabytes",

// SI
"m": "meters",
"V": "volts",
"A": "amperes",
"J": "joules",
"W": "watts",
"g": "grams",

// Misc
"Cel": "celsius",
"Hz": "hertz",
"1": "",
"%": "percent",
},
}, {
prefix: "per",
units: map[string]string{
"s": "second",
"m": "minute",
"h": "hour",
"d": "day",
"w": "week",
"mo": "month",
"y": "year",
},
},
}

// ParseStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows.
//
// callback shouldn't hold tss items after returning.
Expand Down Expand Up @@ -58,34 +122,35 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
// skip metrics without names
continue
}
metricName := sanitizeMetricName(m)
switch {
case m.Gauge != nil:
for _, p := range m.Gauge.DataPoints {
wr.appendSampleFromNumericPoint(m.Name, p)
wr.appendSampleFromNumericPoint(metricName, p)
}
case m.Sum != nil:
if m.Sum.AggregationTemporality != pb.AggregationTemporalityCumulative {
rowsDroppedUnsupportedSum.Inc()
continue
}
for _, p := range m.Sum.DataPoints {
wr.appendSampleFromNumericPoint(m.Name, p)
wr.appendSampleFromNumericPoint(metricName, p)
}
case m.Summary != nil:
for _, p := range m.Summary.DataPoints {
wr.appendSamplesFromSummary(m.Name, p)
wr.appendSamplesFromSummary(metricName, p)
}
case m.Histogram != nil:
if m.Histogram.AggregationTemporality != pb.AggregationTemporalityCumulative {
rowsDroppedUnsupportedHistogram.Inc()
continue
}
for _, p := range m.Histogram.DataPoints {
wr.appendSamplesFromHistogram(m.Name, p)
wr.appendSamplesFromHistogram(metricName, p)
}
default:
rowsDroppedUnsupportedMetricType.Inc()
logger.Warnf("unsupported type for metric %q", m.Name)
logger.Warnf("unsupported type for metric %q", metricName)
}
}
}
Expand Down Expand Up @@ -209,7 +274,7 @@ func (wr *writeContext) appendSampleWithExtraLabel(metricName, labelName, labelV
func appendAttributesToPromLabels(dst []prompbmarshal.Label, attributes []*pb.KeyValue) []prompbmarshal.Label {
for _, at := range attributes {
dst = append(dst, prompbmarshal.Label{
Name: at.Key,
Name: sanitizeLabelName(at.Key),
Value: at.Value.FormatString(),
})
}
Expand Down Expand Up @@ -290,6 +355,74 @@ func (wr *writeContext) parseRequestToTss(req *pb.ExportMetricsServiceRequest) {
}
}

// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_label.go#L26
func sanitizeLabelName(labelName string) string {
if !*sanitizeMetrics {
return labelName
}
if len(labelName) == 0 {
return labelName
}
labelName = promrelabel.SanitizeLabelName(labelName)
if unicode.IsDigit(rune(labelName[0])) {
return "key_" + labelName
} else if strings.HasPrefix(labelName, "_") && !strings.HasPrefix(labelName, "__") {
return "key" + labelName
}
return labelName
}

// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_name.go#L83
func sanitizeMetricName(metric *pb.Metric) string {
if !*sanitizeMetrics {
return metric.Name
}
nameTokens := promrelabel.SanitizeLabelNameParts(metric.Name)
unitTokens := strings.SplitN(metric.Unit, "/", len(unitMap))
for i, u := range unitTokens {
unitToken := strings.TrimSpace(u)
if unitToken == "" || strings.ContainsAny(unitToken, "{}") {
continue
}
if unit, ok := unitMap[i].units[unitToken]; ok {
unitToken = unit
}
if unitToken != "" && !containsToken(nameTokens, unitToken) {
unitPrefix := unitMap[i].prefix
if unitPrefix != "" {
nameTokens = append(nameTokens, unitPrefix, unitToken)
} else {
nameTokens = append(nameTokens, unitToken)
}
}
}
if metric.Sum != nil && metric.Sum.IsMonotonic {
nameTokens = moveOrAppend(nameTokens, "total")
} else if metric.Unit == "1" && metric.Gauge != nil {
nameTokens = moveOrAppend(nameTokens, "ratio")
}
return strings.Join(nameTokens, "_")
}

func containsToken(tokens []string, value string) bool {
for _, token := range tokens {
if token == value {
return true
}
}
return false
}

func moveOrAppend(tokens []string, value string) []string {
for t := range tokens {
if tokens[t] == value {
tokens = append(tokens[:t], tokens[t+1:]...)
break
}
}
return append(tokens, value)
}

var wrPool sync.Pool

func getWriteContext() *writeContext {
Expand Down

0 comments on commit 47892b4

Please sign in to comment.