Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` which enables to add `__unit__` and `__type__` labels for remote write v2 and OTLP requests. The `-distributor.otlp.enable-type-and-unit-labels` flag has been consolidated into this flag. #7077
* [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063
* [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074
* [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072
Expand Down
10 changes: 5 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3270,11 +3270,6 @@ otlp:
# EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.
# CLI flag: -distributor.otlp.allow-delta-temporality
[allow_delta_temporality: <boolean> | default = false]

# EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for
# the OTLP metrics.
# CLI flag: -distributor.otlp.enable-type-and-unit-labels
[enable_type_and_unit_labels: <boolean> | default = false]
```

### `etcd_config`
Expand Down Expand Up @@ -3994,6 +3989,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -distributor.promote-resource-attributes
[promote_resource_attributes: <list of string> | default = ]

# EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics.
# This applies to remote write v2 and OTLP requests.
# CLI flag: -distributor.enable-type-and-unit-labels
[enable_type_and_unit_labels: <boolean> | default = false]

# The maximum number of active series per user, per ingester. 0 to disable.
# CLI flag: -ingester.max-series-per-user
[max_series_per_user: <int> | default = 5000000]
Expand Down
6 changes: 4 additions & 2 deletions integration/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,18 +465,20 @@ func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Labe
st := writev2.NewSymbolTable()
lb := labels.NewScratchBuilder(0)
lb.Add("__name__", name)

for _, label := range additionalLabels {
lb.Add(label.Name, label.Value)
}

series = append(series, writev2.TimeSeries{
// Generate the series
LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil),
Samples: []writev2.Sample{
{Value: value, Timestamp: tsMillis},
},
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
HelpRef: 2, // equal to name
UnitRef: 2, // equal to name
},
})
symbols = st.Symbols()
Expand Down
2 changes: 1 addition & 1 deletion integration/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestOTLPEnableTypeAndUnitLabels(t *testing.T) {
"-auth.enabled": "true",

// OTLP
"-distributor.otlp.enable-type-and-unit-labels": "true",
"-distributor.enable-type-and-unit-labels": "true",

// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
Expand Down
71 changes: 71 additions & 0 deletions integration/remote_write_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,77 @@ func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) {
require.Empty(t, result)
}

func TestIngest_EnableTypeAndUnitLabels(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

flags := mergeFlags(
AlertmanagerLocalFlags(),
map[string]string{
"-store.engine": blocksStorageEngine,
"-blocks-storage.backend": "filesystem",
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.bucket-store.sync-interval": "15m",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-querier.query-store-for-labels-enabled": "true",
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-blocks-storage.tsdb.enable-native-histograms": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
"-distributor.remote-writev2-enabled": "true",
"-distributor.enable-type-and-unit-labels": "true",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
},
)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

path := path.Join(s.SharedDir(), "cortex-1")

flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
// Start Cortex replicas.
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

now := time.Now()

// series push
symbols1, series, _ := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
writeStats, err := c.PushV2(symbols1, series)
require.NoError(t, err)
testPushHeader(t, writeStats, 1, 0, 0)

value, err := c.Query("test_series", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, value.Type())
vec := value.(model.Vector)
require.True(t, vec[0].Metric["__unit__"] != "")
require.True(t, vec[0].Metric["__type__"] != "")
}

func TestIngest(t *testing.T) {
const blockRangePeriod = 5 * time.Second

Expand Down
10 changes: 5 additions & 5 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
Expand All @@ -295,7 +295,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")

// Legacy Routes
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET")
}
Expand All @@ -313,7 +313,7 @@ type Ingester interface {
}

// RegisterIngester registers the ingesters HTTP and GRPC service
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config, overrides *validation.Overrides) {
client.RegisterIngesterServer(a.server.GRPC, i)

a.indexPage.AddLink(SectionAdminEndpoints, "/ingester/all_user_stats", "Usage Statistics")
Expand All @@ -328,12 +328,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.

// Legacy Routes
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
}

func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
}

func (t *Cortex) initIngester() (serv services.Service, err error) {
t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor)
t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor, t.Overrides)

return nil, nil
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/cortexpb/compatv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cortexpb
import (
"fmt"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)

Expand Down Expand Up @@ -32,3 +33,26 @@ func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []s
b.Sort()
return b.Labels(), nil
}

func MetadataV2MetricTypeToMetricType(mt MetadataV2_MetricType) model.MetricType {
switch mt {
case METRIC_TYPE_UNSPECIFIED:
return model.MetricTypeUnknown
case METRIC_TYPE_COUNTER:
return model.MetricTypeCounter
case METRIC_TYPE_GAUGE:
return model.MetricTypeGauge
case METRIC_TYPE_HISTOGRAM:
return model.MetricTypeHistogram
case METRIC_TYPE_GAUGEHISTOGRAM:
return model.MetricTypeGaugeHistogram
case METRIC_TYPE_SUMMARY:
return model.MetricTypeSummary
case METRIC_TYPE_INFO:
return model.MetricTypeInfo
case METRIC_TYPE_STATESET:
return model.MetricTypeStateset
default:
return model.MetricTypeUnknown
}
}
8 changes: 3 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,9 @@ type InstanceLimits struct {
}

type OTLPConfig struct {
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
DisableTargetInfo bool `yaml:"disable_target_info"`
AllowDeltaTemporality bool `yaml:"allow_delta_temporality"`
EnableTypeAndUnitLabels bool `yaml:"enable_type_and_unit_labels"`
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
DisableTargetInfo bool `yaml:"disable_target_info"`
AllowDeltaTemporality bool `yaml:"allow_delta_temporality"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -229,7 +228,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.")
f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)")
f.BoolVar(&cfg.OTLPConfig.AllowDeltaTemporality, "distributor.otlp.allow-delta-temporality", false, "EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.")
f.BoolVar(&cfg.OTLPConfig.EnableTypeAndUnitLabels, "distributor.otlp.enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for the OTLP metrics.")
}

// Validate config and returns error on failure
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distribu
AddMetricSuffixes: true,
DisableTargetInfo: cfg.DisableTargetInfo,
AllowDeltaTemporality: cfg.AllowDeltaTemporality,
EnableTypeAndUnitLabels: cfg.EnableTypeAndUnitLabels,
EnableTypeAndUnitLabels: overrides.EnableTypeAndUnitLabels(userID),
}

var annots annotations.Annotations
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ func TestOTLP_EnableTypeAndUnitLabels(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
cfg := distributor.OTLPConfig{
EnableTypeAndUnitLabels: test.enableTypeAndUnitLabels,
AllowDeltaTemporality: test.allowDeltaTemporality,
AllowDeltaTemporality: test.allowDeltaTemporality,
}
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
Expand All @@ -90,6 +89,7 @@ func TestOTLP_EnableTypeAndUnitLabels(t *testing.T) {
test.otlpSeries.CopyTo(sm.Metrics().AppendEmpty())

limits := validation.Limits{}
limits.EnableTypeAndUnitLabels = test.enableTypeAndUnitLabels
overrides := validation.NewOverrides(limits, nil)
promSeries, metadata, err := convertToPromTS(ctx, metrics, cfg, overrides, "user-1", logger)
require.NoError(t, err)
Expand Down
31 changes: 27 additions & 4 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/exp/api/remote"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/schema"
"github.com/prometheus/prometheus/util/compression"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
"github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/validation"
)

const (
Expand All @@ -36,7 +39,7 @@ const (
type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)

// Handler is a http.Handler which accepts WriteRequests.
func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := log.WithContext(ctx, log.Logger)
Expand Down Expand Up @@ -78,8 +81,13 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware
}

handlePRW2 := func() {
userID, err := tenant.TenantID(ctx)
if err != nil {
return
}

var req cortexpb.PreallocWriteRequestV2
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -91,7 +99,7 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware
req.Source = cortexpb.API
}

v1Req, err := convertV2RequestToV1(&req)
v1Req, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID))
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -169,7 +177,7 @@ func setPRW2RespHeader(w http.ResponseWriter, samples, histograms, exemplars int
w.Header().Set(rw20WrittenExemplarsHeader, strconv.FormatInt(exemplars, 10))
}

func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2) (cortexpb.PreallocWriteRequest, error) {
func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUnitLabels bool) (cortexpb.PreallocWriteRequest, error) {
var v1Req cortexpb.PreallocWriteRequest
v1Timeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries))
var v1Metadata []*cortexpb.MetricMetadata
Expand All @@ -181,10 +189,25 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2) (cortexpb.Preall
if err != nil {
return v1Req, err
}

unit := symbols[v2Ts.Metadata.UnitRef]
metricType := v2Ts.Metadata.Type
shouldAttachTypeAndUnitLabels := enableTypeAndUnitLabels && (metricType != cortexpb.METRIC_TYPE_UNSPECIFIED || unit != "")
if shouldAttachTypeAndUnitLabels {
slb := labels.NewScratchBuilder(lbs.Len() + 2) // for __type__ and __unit__
lbs.Range(func(l labels.Label) {
slb.Add(l.Name, l.Value)
})
schema.Metadata{Type: cortexpb.MetadataV2MetricTypeToMetricType(metricType), Unit: unit}.AddToLabels(&slb)
slb.Sort()
lbs = slb.Labels()
}

exemplars, err := convertV2ToV1Exemplars(&b, symbols, v2Ts.Exemplars)
if err != nil {
return v1Req, err
}

v1Timeseries = append(v1Timeseries, cortexpb.PreallocTimeseries{
TimeSeries: &cortexpb.TimeSeries{
Labels: cortexpb.FromLabelsToLabelAdapters(lbs),
Expand Down
Loading
Loading