diff --git a/CHANGELOG.md b/CHANGELOG.md index 53d4865a2b9..772984f3706 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ * [ENHANCEMENT] Updated Prometheus to include changes from prometheus/prometheus#9083. Now whenever `/labels` API calls include matchers, blocks store is queried for `LabelNames` with matchers instead of `Series` calls which was inefficient. #4380 * [ENHANCEMENT] Exemplars are now emitted for all gRPC calls and many operations tracked by histograms. #4462 * [ENHANCEMENT] New options `-server.http-listen-network` and `-server.grpc-listen-network` allow binding as 'tcp4' or 'tcp6'. #4462 +* [ENHANCEMENT] Ruler/Alertmanager: Add support for Federated tenant rules and alerts #4403 * [BUGFIX] Fixes a panic in the query-tee when comparing result. #4465 * [BUGFIX] Frontend: Fixes @ modifier functions (start/end) when splitting queries by time. #4464 * [BUGFIX] Compactor: compactor will no longer try to compact blocks that are already marked for deletion. Previously compactor would consider blocks marked for deletion within `-compactor.deletion-delay / 2` period as eligible for compaction. #4328 diff --git a/pkg/alertmanager/api.go b/pkg/alertmanager/api.go index 9d1ee0d7d75..a5c15ab9996 100644 --- a/pkg/alertmanager/api.go +++ b/pkg/alertmanager/api.go @@ -58,12 +58,13 @@ type UserConfig struct { func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), am.logger) - userID, err := tenant.TenantID(r.Context()) + tenantIDs, err := tenant.TenantIDs(r.Context()) if err != nil { level.Error(logger).Log("msg", errNoOrgID, "err", err.Error()) http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized) return } + userID := tenant.JoinTenantIDs(tenantIDs) cfg, err := am.store.GetAlertConfig(r.Context(), userID) if err != nil { @@ -95,12 +96,13 @@ func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http. func (am *MultitenantAlertmanager) SetUserConfig(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), am.logger) - userID, err := tenant.TenantID(r.Context()) + tenantIDs, err := tenant.TenantIDs(r.Context()) if err != nil { level.Error(logger).Log("msg", errNoOrgID, "err", err.Error()) http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized) return } + userID := tenant.JoinTenantIDs(tenantIDs) var input io.Reader maxConfigSize := am.limits.AlertmanagerMaxConfigSize(userID) @@ -155,12 +157,13 @@ func (am *MultitenantAlertmanager) SetUserConfig(w http.ResponseWriter, r *http. // Note that if no config exists for a user, StatusOK is returned. func (am *MultitenantAlertmanager) DeleteUserConfig(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), am.logger) - userID, err := tenant.TenantID(r.Context()) + tenantIDs, err := tenant.TenantIDs(r.Context()) if err != nil { level.Error(logger).Log("msg", errNoOrgID, "err", err.Error()) http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized) return } + userID := tenant.JoinTenantIDs(tenantIDs) err = am.store.DeleteAlertConfig(r.Context(), userID) if err != nil { diff --git a/pkg/alertmanager/api_test.go b/pkg/alertmanager/api_test.go index fe54bb2f4e2..8f518dd787b 100644 --- a/pkg/alertmanager/api_test.go +++ b/pkg/alertmanager/api_test.go @@ -640,6 +640,24 @@ route: - '...' continue: false receivers: +- name: route1 + webhook_configs: + - send_resolved: true + http_config: {} + url: http://alertmanager/api/notifications?orgId=1&rrid=7 + max_alerts: 0 +`, + }, + "user1|user2|user3": { + AlertmanagerConfig: ` +global: + resolve_timeout: 5m +route: + receiver: route1 + group_by: + - '...' + continue: false +receivers: - name: route1 webhook_configs: - send_resolved: true @@ -693,7 +711,7 @@ receivers: err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) require.NoError(t, err) - require.Len(t, am.alertmanagers, 2) + require.Len(t, am.alertmanagers, 3) router := mux.NewRouter() router.Path("/multitenant_alertmanager/configs").Methods(http.MethodGet).HandlerFunc(am.ListAllConfigs) diff --git a/pkg/alertmanager/distributor.go b/pkg/alertmanager/distributor.go index db1a08dbb6c..bc980a0d49b 100644 --- a/pkg/alertmanager/distributor.go +++ b/pkg/alertmanager/distributor.go @@ -122,11 +122,12 @@ func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request) d.requestsInFlight.Add(1) defer d.requestsInFlight.Done() - userID, err := tenant.TenantID(r.Context()) + tenantIDs, err := tenant.TenantIDs(r.Context()) if err != nil { http.Error(w, err.Error(), http.StatusUnauthorized) return } + userID := tenant.JoinTenantIDs(tenantIDs) logger := util_log.WithContext(r.Context(), d.logger) diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 25f6dcfc25c..6346baf0ad8 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -1003,11 +1003,12 @@ func (am *MultitenantAlertmanager) HandleRequest(ctx context.Context, in *httpgr // serveRequest serves the Alertmanager's web UI and API. func (am *MultitenantAlertmanager) serveRequest(w http.ResponseWriter, req *http.Request) { - userID, err := tenant.TenantID(req.Context()) + tenantIDs, err := tenant.TenantIDs(req.Context()) if err != nil { http.Error(w, err.Error(), http.StatusUnauthorized) return } + userID := tenant.JoinTenantIDs(tenantIDs) am.alertmanagersMtx.Lock() userAM, ok := am.alertmanagers[userID] am.alertmanagersMtx.Unlock() @@ -1148,10 +1149,11 @@ func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, use // UpdateState implements the Alertmanager service. func (am *MultitenantAlertmanager) UpdateState(ctx context.Context, part *clusterpb.Part) (*alertmanagerpb.UpdateStateResponse, error) { - userID, err := tenant.TenantID(ctx) + tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, err } + userID := tenant.JoinTenantIDs(tenantIDs) am.alertmanagersMtx.Lock() userAM, ok := am.alertmanagers[userID] @@ -1254,10 +1256,11 @@ func (am *MultitenantAlertmanager) getPerUserDirectories() map[string]string { // UpdateState implements the Alertmanager service. func (am *MultitenantAlertmanager) ReadState(ctx context.Context, req *alertmanagerpb.ReadStateRequest) (*alertmanagerpb.ReadStateResponse, error) { - userID, err := tenant.TenantID(ctx) + tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, err } + userID := tenant.JoinTenantIDs(tenantIDs) am.alertmanagersMtx.Lock() userAM, ok := am.alertmanagers[userID] diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index c83b0d5f363..2d029d4944a 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -238,7 +238,7 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) { // single tenant. This allows for a less impactful enabling of tenant // federation. byPassForSingleQuerier := true - t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier)) + t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier, t.Cfg.TenantFederation)) } return nil, nil } @@ -662,8 +662,9 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer) // TODO: Consider wrapping logger to differentiate from querier module logger queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, rulerRegisterer, util_log.Logger) + mq := tenantfederation.NewQueryable(queryable, false, t.Cfg.TenantFederation) - managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, prometheus.DefaultRegisterer) + managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, mq, engine, t.Overrides, prometheus.DefaultRegisterer) manager, err := ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger) if err != nil { return nil, err diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 872c7f0435a..90c33f3b7a3 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -954,23 +954,31 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through // MetricsMetadata returns all metric metadata of a user. func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) { - replicationSet, err := d.GetIngestersForMetadata(ctx) + userIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, err } - req := &ingester_client.MetricsMetadataRequest{} - // TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled. - resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { - return client.MetricsMetadata(ctx, req) - }) - if err != nil { - return nil, err + var userResps []interface{} + for _, userID := range userIDs { + ctx = user.InjectOrgID(ctx, userID) + replicationSet, err := d.GetIngestersForMetadata(ctx) + if err != nil { + return nil, err + } + req := &ingester_client.MetricsMetadataRequest{} + resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + return client.MetricsMetadata(ctx, req) + }) + if err != nil { + return nil, err + } + userResps = append(userResps, resps...) } result := []scrape.MetricMetadata{} dedupTracker := map[cortexpb.MetricMetadata]struct{}{} - for _, resp := range resps { + for _, resp := range userResps { r := resp.(*ingester_client.MetricsMetadataResponse) for _, m := range r.Metadata { // Given we look across all ingesters - dedup the metadata. diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index 9efba31d135..ef566e9ae54 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -21,7 +21,6 @@ import ( const ( defaultTenantLabel = "__tenant_id__" retainExistingPrefix = "original_" - maxConcurrency = 16 ) // NewQueryable returns a queryable that iterates through all the tenant IDs @@ -36,8 +35,8 @@ const ( // If the label "__tenant_id__" is already existing, its value is overwritten // by the tenant ID and the previous value is exposed through a new label // prefixed with "original_". This behaviour is not implemented recursively. -func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool) storage.Queryable { - return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier) +func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool, cfg Config) storage.Queryable { + return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier, cfg) } func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback { @@ -80,11 +79,12 @@ type MergeQuerierCallback func(ctx context.Context, mint int64, maxt int64) (ids // If the label `idLabelName` is already existing, its value is overwritten and // the previous value is exposed through a new label prefixed with "original_". // This behaviour is not implemented recursively. -func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool) storage.Queryable { +func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool, cfg Config) storage.Queryable { return &mergeQueryable{ idLabelName: idLabelName, callback: callback, byPassWithSingleQuerier: byPassWithSingleQuerier, + cfg: cfg, } } @@ -92,6 +92,7 @@ type mergeQueryable struct { idLabelName string byPassWithSingleQuerier bool callback MergeQuerierCallback + cfg Config } // Querier returns a new mergeQuerier, which aggregates results from multiple @@ -111,10 +112,11 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s } return &mergeQuerier{ - ctx: ctx, - idLabelName: m.idLabelName, - queriers: queriers, - ids: ids, + ctx: ctx, + idLabelName: m.idLabelName, + queriers: queriers, + ids: ids, + maxConcurrency: m.cfg.MaxConcurrency, }, nil } @@ -125,10 +127,11 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s // the previous value is exposed through a new label prefixed with "original_". // This behaviour is not implemented recursively type mergeQuerier struct { - ctx context.Context - queriers []storage.Querier - idLabelName string - ids []string + ctx context.Context + queriers []storage.Querier + idLabelName string + ids []string + maxConcurrency int } // LabelValues returns all potential values for a label name. It is not safe @@ -248,7 +251,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, te return nil } - err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run) + err := concurrency.ForEach(m.ctx, jobs, m.maxConcurrency, run) if err != nil { return nil, nil, err } @@ -334,7 +337,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match return nil } - err := concurrency.ForEach(ctx, jobs, maxConcurrency, run) + err := concurrency.ForEach(ctx, jobs, m.maxConcurrency, run) if err != nil { return storage.ErrSeriesSet(err) } diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index fd1441daa07..d493736d454 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -257,7 +257,9 @@ type mergeQueryableScenario struct { func (s *mergeQueryableScenario) init() (storage.Querier, error) { // initialize with default tenant label - q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier) + q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier, Config{ + MaxConcurrency: 16, + }) // inject tenants into context ctx := context.Background() @@ -334,7 +336,9 @@ type labelValuesScenario struct { func TestMergeQueryable_Querier(t *testing.T) { t.Run("querying without a tenant specified should error", func(t *testing.T) { queryable := &mockTenantQueryableWithFilter{} - q := NewQueryable(queryable, false /* byPassWithSingleQuerier */) + q := NewQueryable(queryable, false /* bypasswithsinglequerier */, Config{ + MaxConcurrency: 16, + }) // Create a context with no tenant specified. ctx := context.Background() @@ -873,7 +877,9 @@ func TestTracingMergeQueryable(t *testing.T) { // set a multi tenant resolver tenant.WithDefaultResolver(tenant.NewMultiResolver()) filter := mockTenantQueryableWithFilter{} - q := NewQueryable(&filter, false) + q := NewQueryable(&filter, false, Config{ + MaxConcurrency: 16, + }) // retrieve querier if set querier, err := q.Querier(ctx, mint, maxt) require.NoError(t, err) diff --git a/pkg/querier/tenantfederation/tenant_federation.go b/pkg/querier/tenantfederation/tenant_federation.go index af5bd7b929e..f7e915002e9 100644 --- a/pkg/querier/tenantfederation/tenant_federation.go +++ b/pkg/querier/tenantfederation/tenant_federation.go @@ -6,9 +6,11 @@ import ( type Config struct { // Enabled switches on support for multi tenant query federation - Enabled bool `yaml:"enabled"` + Enabled bool `yaml:"enabled"` + MaxConcurrency int `yaml:"max-concurrency"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).") + f.IntVar(&cfg.MaxConcurrency, "tenant-federation.max-concurrency", 16, "Maximum concurrent federated sub queries used when evaluating a federated query") } diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index 8dc974ac50b..e6d8b4e59c4 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -138,8 +138,8 @@ func NewAPI(r *Ruler, s rulestore.RuleStore, logger log.Logger) *API { func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { logger := util_log.WithContext(req.Context(), a.logger) - userID, err := tenant.TenantID(req.Context()) - if err != nil || userID == "" { + tenantIDs, err := tenant.TenantIDs(req.Context()) + if err != nil || len(tenantIDs) == 0 { level.Error(logger).Log("msg", "error extracting org id from context", "err", err) respondError(logger, w, "no valid org id found") return @@ -230,8 +230,8 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) { logger := util_log.WithContext(req.Context(), a.logger) - userID, err := tenant.TenantID(req.Context()) - if err != nil || userID == "" { + tenantIDs, err := tenant.TenantIDs(req.Context()) + if err != nil || len(tenantIDs) == 0 { level.Error(logger).Log("msg", "error extracting org id from context", "err", err) respondError(logger, w, "no valid org id found") return @@ -359,10 +359,11 @@ func parseGroupName(params map[string]string) (string, error) { // and returns them in that order. It also allows users to require a namespace or group name and return // an error if it they can not be parsed. func parseRequest(req *http.Request, requireNamespace, requireGroup bool) (string, string, string, error) { - userID, err := tenant.TenantID(req.Context()) + tenantIDs, err := tenant.TenantIDs(req.Context()) if err != nil { return "", "", "", user.ErrNoOrgID } + userID := tenant.JoinTenantIDs(tenantIDs) vars := mux.Vars(req) diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index 2fc60386826..57996cdc50f 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -240,7 +240,7 @@ rules: router.Path("/api/v1/rules/{namespace}").Methods("POST").HandlerFunc(a.CreateRuleGroup) router.Path("/api/v1/rules/{namespace}/{groupName}").Methods("GET").HandlerFunc(a.GetRuleGroup) // POST - req := requestFor(t, http.MethodPost, "https://localhost:8080/api/v1/rules/namespace", strings.NewReader(tt.input), "user1") + req := requestFor(t, http.MethodPost, "https://localhost:8080/api/v1/rules/namespace", strings.NewReader(tt.input), "user1|user2|user3") w := httptest.NewRecorder() router.ServeHTTP(w, req) @@ -248,7 +248,7 @@ rules: if tt.err == nil { // GET - req = requestFor(t, http.MethodGet, "https://localhost:8080/api/v1/rules/namespace/test", nil, "user1") + req = requestFor(t, http.MethodGet, "https://localhost:8080/api/v1/rules/namespace/test", nil, "user1|user2|user3") w = httptest.NewRecorder() router.ServeHTTP(w, req) diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 8ee0fe51917..7a8ae7ced3b 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -21,6 +21,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/tenant" util_log "github.com/cortexproject/cortex/pkg/util/log" ) @@ -36,7 +37,7 @@ type PusherAppender struct { ctx context.Context pusher Pusher labels []labels.Labels - samples []cortexpb.Sample + samples map[string][]cortexpb.Sample userID string evaluationDelay time.Duration } @@ -56,7 +57,17 @@ func (a *PusherAppender) Append(_ uint64, l labels.Labels, t int64, v float64) ( t -= a.evaluationDelay.Milliseconds() } - a.samples = append(a.samples, cortexpb.Sample{ + userID := a.userID + if tenant.IsCompositeTenantID(userID) { + userIDs, err := tenant.TenantIDsFromOrgID(userID) + if err != nil { + return 0, err + } + // Mod the hash of the series so same series always goes to same subtenant + i := int(l.Copy().Hash()) % len(userIDs) + userID = userIDs[i] + } + a.samples[userID] = append(a.samples[userID], cortexpb.Sample{ TimestampMs: t, Value: v, }) @@ -72,23 +83,26 @@ func (a *PusherAppender) Commit() error { // Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push. // We shouldn't call client.ReuseSlice here. - _, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), cortexpb.ToWriteRequest(a.labels, a.samples, nil, cortexpb.RULE)) - - if err != nil { - // Don't report errors that ended with 4xx HTTP status code (series limits, duplicate samples, out of order, etc.) - if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code/100 != 4 { - a.failedWrites.Inc() + var err error + for userID, samples := range a.samples { + _, err = a.pusher.Push(user.InjectOrgID(a.ctx, userID), cortexpb.ToWriteRequest(a.labels, samples, nil, cortexpb.RULE)) + + if err != nil { + // Don't report errors that ended with 4xx HTTP status code (series limits, duplicate samples, out of order, etc.) + if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code/100 != 4 { + a.failedWrites.Inc() + } } } a.labels = nil - a.samples = nil + a.samples = map[string][]cortexpb.Sample{} return err } func (a *PusherAppender) Rollback() error { a.labels = nil - a.samples = nil + a.samples = map[string][]cortexpb.Sample{} return nil } @@ -122,6 +136,7 @@ func (t *PusherAppendable) Appender(ctx context.Context) storage.Appender { pusher: t.pusher, userID: t.userID, evaluationDelay: t.rulesLimits.EvaluationDelay(t.userID), + samples: map[string][]cortexpb.Sample{}, } } diff --git a/pkg/ruler/compat_test.go b/pkg/ruler/compat_test.go index 968f5cc66da..31467232932 100644 --- a/pkg/ruler/compat_test.go +++ b/pkg/ruler/compat_test.go @@ -19,6 +19,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/tenant" ) type fakePusher struct { @@ -28,13 +29,17 @@ type fakePusher struct { } func (p *fakePusher) Push(ctx context.Context, r *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { + _, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } p.request = r return p.response, p.err } func TestPusherAppendable(t *testing.T) { pusher := &fakePusher{} - pa := NewPusherAppendable(pusher, "user-1", nil, prometheus.NewCounter(prometheus.CounterOpts{}), prometheus.NewCounter(prometheus.CounterOpts{})) + pa := NewPusherAppendable(pusher, "1|2|3|4|5", nil, prometheus.NewCounter(prometheus.CounterOpts{}), prometheus.NewCounter(prometheus.CounterOpts{})) for _, tc := range []struct { name string diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 1633df4ea79..e9bbd00f5ae 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -645,10 +645,11 @@ func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, ring r // GetRules retrieves the running rules from this ruler and all running rulers in the ring if // sharding is enabled func (r *Ruler) GetRules(ctx context.Context) ([]*GroupStateDesc, error) { - userID, err := tenant.TenantID(ctx) + tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, fmt.Errorf("no user id found in context") } + userID := tenant.JoinTenantIDs(tenantIDs) if r.cfg.EnableSharding { return r.getShardedRules(ctx) @@ -788,10 +789,11 @@ func (r *Ruler) getShardedRules(ctx context.Context) ([]*GroupStateDesc, error) // Rules implements the rules service func (r *Ruler) Rules(ctx context.Context, in *RulesRequest) (*RulesResponse, error) { - userID, err := tenant.TenantID(ctx) + tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, fmt.Errorf("no user id found in context") } + userID := tenant.JoinTenantIDs(tenantIDs) groupDescs, err := r.getLocalRules(userID) if err != nil { @@ -835,13 +837,14 @@ func (r *Ruler) AssertMaxRulesPerRuleGroup(userID string, rules int) error { func (r *Ruler) DeleteTenantConfiguration(w http.ResponseWriter, req *http.Request) { logger := util_log.WithContext(req.Context(), r.logger) - userID, err := tenant.TenantID(req.Context()) + tenantIDs, err := tenant.TenantIDs(req.Context()) if err != nil { // When Cortex is running, it uses Auth Middleware for checking X-Scope-OrgID and injecting tenant into context. // Auth Middleware sends http.StatusUnauthorized if X-Scope-OrgID is missing, so we do too here, for consistency. http.Error(w, err.Error(), http.StatusUnauthorized) return } + userID := tenant.JoinTenantIDs(tenantIDs) err = r.store.DeleteNamespace(req.Context(), userID, "") // Empty namespace = delete all rule groups. if err != nil && !errors.Is(err, rulestore.ErrGroupNamespaceNotFound) { diff --git a/pkg/tenant/tenant.go b/pkg/tenant/tenant.go index fa808989077..babe601c4f3 100644 --- a/pkg/tenant/tenant.go +++ b/pkg/tenant/tenant.go @@ -68,6 +68,10 @@ func ValidTenantID(s string) error { return nil } +func IsCompositeTenantID(s string) bool { + return strings.Contains(s, tenantIDsLabelSeparator) +} + func JoinTenantIDs(tenantIDs []string) string { return strings.Join(tenantIDs, tenantIDsLabelSeparator) }