Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce single forward endpoint to simplify configuration. #2801

Merged
merged 6 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [ENHANCEMENT] Ruler: Add `<prometheus-http-prefix>/api/v1/status/buildinfo` endpoint. #2724
* [ENHANCEMENT] Querier: Ensure all queries pulled from query-frontend or query-scheduler are immediately executed. The maximum workers concurrency in each querier is configured by `-querier.max-concurrent`. #2598
* [ENHANCEMENT] Distributor: Add `cortex_distributor_received_requests_total` and `cortex_distributor_requests_in_total` metrics to provide visiblity into appropriate per-tenant request limits. #2770
* [ENHANCEMENT] Distributor: Add single forwarding remote-write endpoint for a tenant (`forwarding_endpoint`), instead of using per-rule endpoints. This takes precendence over per-rule endpoints. #2801
* [BUGFIX] Fix reporting of tracing spans from PromQL engine. #2707
* [BUGFIX] Distributor: Apply distributor instance limits before running HA deduplication. #2709
* [BUGFIX] Apply relabel and drop_label rules before forwarding rules in the distributor. #2703
Expand Down
9 changes: 9 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -3104,6 +3104,15 @@
"fieldFlag": "alertmanager.max-alerts-size-bytes",
"fieldType": "int"
},
{
"kind": "field",
"name": "forwarding_endpoint",
"required": false,
"desc": "Remote-write endpoint where metrics specified in forwarding_rules are forwarded to. If set, takes precedence over endpoints specified in forwarding rules.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldType": "string"
},
{
"kind": "field",
"name": "forwarding_rules",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2639,6 +2639,11 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -alertmanager.max-alerts-size-bytes
[alertmanager_max_alerts_size_bytes: <int> | default = 0]

# Remote-write endpoint where metrics specified in forwarding_rules are
# forwarded to. If set, takes precedence over endpoints specified in forwarding
# rules.
[forwarding_endpoint: <string> | default = ""]

# Rules based on which the Distributor decides whether a metric should be
# forwarded to an alternative remote_write API endpoint.
[forwarding_rules: <map of string to validation.ForwardingRule> | default = ]
Expand Down
4 changes: 3 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,9 +891,11 @@ func (d *Distributor) forwardSamples(ctx context.Context, userID string, ts []mi
return ts, forwardingErrCh
}

endpoint := d.limits.ForwardingEndpoint(userID)

// Reassign req.Timeseries because the forwarder creates a new slice which has been filtered down.
// The cleanup func will cleanup the new slice, it's the forwarders responsibility to return the old one to the pool.
ts, forwardingErrCh = d.forwarder.Forward(ctx, forwardingRules, ts)
ts, forwardingErrCh = d.forwarder.Forward(ctx, endpoint, forwardingRules, ts)

return ts, forwardingErrCh
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4091,7 +4091,7 @@ func newMockForwarder(timeseriesMutator func([]mimirpb.PreallocTimeseries) []mim
}
}

func (m *mockForwarder) Forward(ctx context.Context, forwardingRules validation.ForwardingRules, ts []mimirpb.PreallocTimeseries) ([]mimirpb.PreallocTimeseries, chan error) {
func (m *mockForwarder) Forward(ctx context.Context, endpoint string, forwardingRules validation.ForwardingRules, ts []mimirpb.PreallocTimeseries) ([]mimirpb.PreallocTimeseries, chan error) {
errCh := make(chan error)

go func() {
Expand Down
19 changes: 13 additions & 6 deletions pkg/distributor/forwarding/forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

type Forwarder interface {
services.Service
Forward(ctx context.Context, forwardingRules validation.ForwardingRules, ts []mimirpb.PreallocTimeseries) ([]mimirpb.PreallocTimeseries, chan error)
Forward(ctx context.Context, targetEndpoint string, forwardingRules validation.ForwardingRules, ts []mimirpb.PreallocTimeseries) ([]mimirpb.PreallocTimeseries, chan error)
}

type forwarder struct {
Expand Down Expand Up @@ -122,23 +122,26 @@ func (f *forwarder) worker() {
// Forward takes a set of forwarding rules and a slice of time series, it forwards the time series according to the rules.
// This function may return before the forwarding requests have completed, the caller can use the returned chan of errors
// to determine whether all forwarding requests have completed by checking if it is closed.
//
// The forwarding requests get executed with a limited concurrency which is configurable, in a situation where the
// concurrency limit is exhausted this function will block until a go routine is available to execute the requests.
// The slice of time series which gets passed into this function must not be returned to the pool by the caller, the
// returned slice of time series must be returned to the pool by the caller once it is done using it.
//
// If endpoint is not empty, it is used instead of any rule-specific endpoints.
//
// The return values are:
// - A slice of time series which should be sent to the ingesters, based on the given rule set.
// The Forward() method does not send the time series to the ingesters itself, it expects the caller to do that.
// - A chan of errors which resulted from forwarding the time series, the chan gets closed when all forwarding requests have completed.
func (f *forwarder) Forward(ctx context.Context, rules validation.ForwardingRules, in []mimirpb.PreallocTimeseries) ([]mimirpb.PreallocTimeseries, chan error) {
func (f *forwarder) Forward(ctx context.Context, endpoint string, rules validation.ForwardingRules, in []mimirpb.PreallocTimeseries) ([]mimirpb.PreallocTimeseries, chan error) {
if !f.cfg.Enabled {
errCh := make(chan error)
close(errCh)
return in, errCh
}

toIngest, tsByTargets := f.splitByTargets(in, rules)
toIngest, tsByTargets := f.splitByTargets(endpoint, in, rules)
defer f.pools.putTsByTargets(tsByTargets)

var requestWg sync.WaitGroup
Expand Down Expand Up @@ -200,7 +203,7 @@ func (t *TimeseriesCounts) count(ts mimirpb.PreallocTimeseries) {
//
// - A slice of time series to ingest into the ingesters.
// - A map of slices of time series which is keyed by the target to which they should be forwarded.
func (f *forwarder) splitByTargets(tsSliceIn []mimirpb.PreallocTimeseries, rules validation.ForwardingRules) ([]mimirpb.PreallocTimeseries, tsByTargets) {
func (f *forwarder) splitByTargets(targetEndpoint string, tsSliceIn []mimirpb.PreallocTimeseries, rules validation.ForwardingRules) ([]mimirpb.PreallocTimeseries, tsByTargets) {
// This functions copies all the entries of tsSliceIn into new slices so tsSliceIn can be recycled,
// we adjust the length of the slice to 0 to prevent that the contained *mimirpb.TimeSeries objects that have been
// reassigned (not deep copied) get returned while they are still referred to by another slice.
Expand All @@ -209,7 +212,7 @@ func (f *forwarder) splitByTargets(tsSliceIn []mimirpb.PreallocTimeseries, rules
tsToIngest := f.pools.getTsSlice()
tsByTargets := f.pools.getTsByTargets()
for _, ts := range tsSliceIn {
forwardingTarget, ingest := findTargetForLabels(ts.Labels, rules)
forwardingTarget, ingest := findTargetForLabels(targetEndpoint, ts.Labels, rules)
if forwardingTarget != "" {
tsByTargets.copyToTarget(forwardingTarget, ts, f.pools)
}
Expand All @@ -229,7 +232,7 @@ func (f *forwarder) splitByTargets(tsSliceIn []mimirpb.PreallocTimeseries, rules
return tsToIngest, tsByTargets
}

func findTargetForLabels(labels []mimirpb.LabelAdapter, rules validation.ForwardingRules) (string, bool) {
func findTargetForLabels(targetEndpoint string, labels []mimirpb.LabelAdapter, rules validation.ForwardingRules) (string, bool) {
metric, err := extract.UnsafeMetricNameFromLabelAdapters(labels)
if err != nil {
// Can't check whether a timeseries should be forwarded if it has no metric name.
Expand All @@ -243,6 +246,10 @@ func findTargetForLabels(labels []mimirpb.LabelAdapter, rules validation.Forward
return "", true
}

// Target endpoint is set, use it.
if targetEndpoint != "" {
return targetEndpoint, rule.Ingest
}
return rule.Endpoint, rule.Ingest
}

Expand Down
87 changes: 83 additions & 4 deletions pkg/distributor/forwarding/forwarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestForwardingSamplesSuccessfullyToCorrectTarget(t *testing.T) {
newSample(t, now, 3, 300, "__name__", "metric2", "some_label", "foo"),
newSample(t, now, 4, 400, "__name__", "metric2", "some_label", "bar"),
}
tsToIngest, errCh := forwarder.Forward(ctx, rules, ts)
tsToIngest, errCh := forwarder.Forward(ctx, "", rules, ts)

// The metric2 should be returned by the forwarding because the matching rule has ingest set to "true".
require.Len(t, tsToIngest, 2)
Expand Down Expand Up @@ -118,6 +118,85 @@ func TestForwardingSamplesSuccessfullyToCorrectTarget(t *testing.T) {
))
}

func TestForwardingSamplesSuccessfullyToSingleTarget(t *testing.T) {
ctx := context.Background()
now := time.Now().UnixMilli()
forwarder, reg := newForwarder(t, testConfig, true)

url, reqs, bodiesFn, closeFn := newTestServer(t, 200, true)
defer closeFn()

rules := validation.ForwardingRules{
"metric1": validation.ForwardingRule{Ingest: false, Endpoint: "not used"},
"metric2": validation.ForwardingRule{Ingest: true, Endpoint: ""},
}

ts := []mimirpb.PreallocTimeseries{
newSample(t, now, 1, 100, "__name__", "metric1", "some_label", "foo"),
newSample(t, now, 2, 200, "__name__", "metric1", "some_label", "bar"),
newSample(t, now, 3, 300, "__name__", "metric2", "some_label", "foo"),
newSample(t, now, 4, 400, "__name__", "metric2", "some_label", "bar"),
}
tsToIngest, errCh := forwarder.Forward(ctx, url, rules, ts)

// The metric2 should be returned by the forwarding because the matching rule has ingest set to "true".
require.Len(t, tsToIngest, 2)
requireLabelsEqual(t, tsToIngest[0].Labels, "__name__", "metric2", "some_label", "foo")
requireSamplesEqual(t, tsToIngest[0].Samples, now, 3)
requireExemplarsEqual(t, tsToIngest[0].Exemplars, now, 300)
requireLabelsEqual(t, tsToIngest[1].Labels, "__name__", "metric2", "some_label", "bar")
requireSamplesEqual(t, tsToIngest[1].Samples, now, 4)
requireExemplarsEqual(t, tsToIngest[1].Exemplars, now, 400)

// Loop until errCh gets closed, errCh getting closed indicates that all forwarding requests have completed.
for err := range errCh {
require.NoError(t, err)
}

for _, req := range reqs() {
require.Equal(t, "snappy", req.Header.Get("Content-Encoding"))
require.Equal(t, "application/x-protobuf", req.Header.Get("Content-Type"))
}

bodies := bodiesFn()
// Single request, with all the metrics.
require.Len(t, bodies, 1)

receivedReq1 := decodeBody(t, bodies[0])
require.Len(t, receivedReq1.Timeseries, 4)
requireLabelsEqual(t, receivedReq1.Timeseries[0].Labels, "__name__", "metric1", "some_label", "foo")
requireSamplesEqual(t, receivedReq1.Timeseries[0].Samples, now, 1)
require.Empty(t, receivedReq1.Timeseries[0].Exemplars)

requireLabelsEqual(t, receivedReq1.Timeseries[1].Labels, "__name__", "metric1", "some_label", "bar")
requireSamplesEqual(t, receivedReq1.Timeseries[1].Samples, now, 2)
require.Empty(t, receivedReq1.Timeseries[1].Exemplars)

requireLabelsEqual(t, receivedReq1.Timeseries[2].Labels, "__name__", "metric2", "some_label", "foo")
requireSamplesEqual(t, receivedReq1.Timeseries[2].Samples, now, 3)
require.Empty(t, receivedReq1.Timeseries[2].Exemplars)

requireLabelsEqual(t, receivedReq1.Timeseries[3].Labels, "__name__", "metric2", "some_label", "bar")
requireSamplesEqual(t, receivedReq1.Timeseries[3].Samples, now, 4)
require.Empty(t, receivedReq1.Timeseries[3].Exemplars)

expectedMetrics := `
# HELP cortex_distributor_forward_requests_total The total number of requests the Distributor made to forward samples.
# TYPE cortex_distributor_forward_requests_total counter
cortex_distributor_forward_requests_total{} 1
# HELP cortex_distributor_forward_samples_total The total number of samples the Distributor forwarded.
# TYPE cortex_distributor_forward_samples_total counter
cortex_distributor_forward_samples_total{} 4
`

require.NoError(t, testutil.GatherAndCompare(
reg,
strings.NewReader(expectedMetrics),
"cortex_distributor_forward_requests_total",
"cortex_distributor_forward_samples_total",
))
}

func TestForwardingSamplesWithDifferentErrorsWithPropagation(t *testing.T) {
type status uint16
const (
Expand Down Expand Up @@ -211,7 +290,7 @@ func TestForwardingSamplesWithDifferentErrorsWithPropagation(t *testing.T) {
for _, metric := range metrics {
ts = append(ts, newSample(t, now, 1, 100, "__name__", metric))
}
_, errCh := forwarder.Forward(context.Background(), rules, ts)
_, errCh := forwarder.Forward(context.Background(), "", rules, ts)

gotStatusCodes := []status{}
for err := range errCh {
Expand Down Expand Up @@ -420,7 +499,7 @@ func TestForwardingEnsureThatPooledObjectsGetReturned(t *testing.T) {
}

// Perform the forwarding operation.
toIngest, errCh := forwarder.Forward(context.Background(), tc.rules, ts)
toIngest, errCh := forwarder.Forward(context.Background(), "", tc.rules, ts)
require.NoError(t, <-errCh)

// receivedSamples counts the number of samples that each forwarding target has received.
Expand Down Expand Up @@ -682,7 +761,7 @@ func BenchmarkRemoteWriteForwarding(b *testing.B) {
require.NoError(b, <-errChs[errChIdx])
}

samples, errChs[errChIdx] = f.Forward(ctx, tc.rules, samples)
samples, errChs[errChIdx] = f.Forward(ctx, "", tc.rules, samples)
errChIdx = (errChIdx + 1) % len(errChs)

mimirpb.ReuseSlice(samples)
Expand Down
8 changes: 7 additions & 1 deletion pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type ForwardingRule struct {
Ingest bool `yaml:"ingest" json:"ingest"`

// Endpoint is the URL of the remote_write endpoint to which a metric should be forwarded.
// Deprecated in favor of ForwardingEndpoint.
Endpoint string `yaml:"endpoint" json:"endpoint"`
}

Expand Down Expand Up @@ -159,7 +160,8 @@ type Limits struct {
AlertmanagerMaxAlertsCount int `yaml:"alertmanager_max_alerts_count" json:"alertmanager_max_alerts_count"`
AlertmanagerMaxAlertsSizeBytes int `yaml:"alertmanager_max_alerts_size_bytes" json:"alertmanager_max_alerts_size_bytes"`

ForwardingRules ForwardingRules `yaml:"forwarding_rules" json:"forwarding_rules" doc:"nocli|description=Rules based on which the Distributor decides whether a metric should be forwarded to an alternative remote_write API endpoint."`
ForwardingEndpoint string `yaml:"forwarding_endpoint" json:"forwarding_endpoint" doc:"nocli|description=Remote-write endpoint where metrics specified in forwarding_rules are forwarded to. If set, takes precedence over endpoints specified in forwarding rules."`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "nocli"?

Copy link
Member Author

@pstibrany pstibrany Aug 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point we're moving it from per-rule to per-tenant, hence nocli. But I agree that it could be useful to set it globally too in some situations. Let's tackle that when we remove per-rule endpoints.

(nocli doesn't prevent this to be used globally via config file of course)

ForwardingRules ForwardingRules `yaml:"forwarding_rules" json:"forwarding_rules" doc:"nocli|description=Rules based on which the Distributor decides whether a metric should be forwarded to an alternative remote_write API endpoint."`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -707,6 +709,10 @@ func (o *Overrides) ForwardingRules(user string) ForwardingRules {
return o.getOverridesForUser(user).ForwardingRules
}

func (o *Overrides) ForwardingEndpoint(user string) string {
return o.getOverridesForUser(user).ForwardingEndpoint
}

func (o *Overrides) getOverridesForUser(userID string) *Limits {
if o.tenantLimits != nil {
l := o.tenantLimits.ByUserID(userID)
Expand Down