Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* [ENHANCEMENT] Updated Prometheus to include changes from prometheus/prometheus#9083. Now whenever `/labels` API calls include matchers, blocks store is queried for `LabelNames` with matchers instead of `Series` calls which was inefficient. #4380
* [ENHANCEMENT] Exemplars are now emitted for all gRPC calls and many operations tracked by histograms. #4462
* [ENHANCEMENT] New options `-server.http-listen-network` and `-server.grpc-listen-network` allow binding as 'tcp4' or 'tcp6'. #4462
* [ENHANCEMENT] Ruler/Alertmanager: Add support for Federated tenant rules and alerts #4403
* [BUGFIX] Fixes a panic in the query-tee when comparing result. #4465
* [BUGFIX] Frontend: Fixes @ modifier functions (start/end) when splitting queries by time. #4464
* [BUGFIX] Compactor: compactor will no longer try to compact blocks that are already marked for deletion. Previously compactor would consider blocks marked for deletion within `-compactor.deletion-delay / 2` period as eligible for compaction. #4328
Expand Down
9 changes: 6 additions & 3 deletions pkg/alertmanager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ type UserConfig struct {
func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), am.logger)

userID, err := tenant.TenantID(r.Context())
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
level.Error(logger).Log("msg", errNoOrgID, "err", err.Error())
http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized)
return
}
userID := tenant.JoinTenantIDs(tenantIDs)

cfg, err := am.store.GetAlertConfig(r.Context(), userID)
if err != nil {
Expand Down Expand Up @@ -95,12 +96,13 @@ func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http.

func (am *MultitenantAlertmanager) SetUserConfig(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), am.logger)
userID, err := tenant.TenantID(r.Context())
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
level.Error(logger).Log("msg", errNoOrgID, "err", err.Error())
http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized)
return
}
userID := tenant.JoinTenantIDs(tenantIDs)

var input io.Reader
maxConfigSize := am.limits.AlertmanagerMaxConfigSize(userID)
Expand Down Expand Up @@ -155,12 +157,13 @@ func (am *MultitenantAlertmanager) SetUserConfig(w http.ResponseWriter, r *http.
// Note that if no config exists for a user, StatusOK is returned.
func (am *MultitenantAlertmanager) DeleteUserConfig(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), am.logger)
userID, err := tenant.TenantID(r.Context())
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
level.Error(logger).Log("msg", errNoOrgID, "err", err.Error())
http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized)
return
}
userID := tenant.JoinTenantIDs(tenantIDs)

err = am.store.DeleteAlertConfig(r.Context(), userID)
if err != nil {
Expand Down
20 changes: 19 additions & 1 deletion pkg/alertmanager/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,24 @@ route:
- '...'
continue: false
receivers:
- name: route1
webhook_configs:
- send_resolved: true
http_config: {}
url: http://alertmanager/api/notifications?orgId=1&rrid=7
max_alerts: 0
`,
},
"user1|user2|user3": {
AlertmanagerConfig: `
global:
resolve_timeout: 5m
route:
receiver: route1
group_by:
- '...'
continue: false
receivers:
- name: route1
webhook_configs:
- send_resolved: true
Expand Down Expand Up @@ -693,7 +711,7 @@ receivers:

err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic)
require.NoError(t, err)
require.Len(t, am.alertmanagers, 2)
require.Len(t, am.alertmanagers, 3)

router := mux.NewRouter()
router.Path("/multitenant_alertmanager/configs").Methods(http.MethodGet).HandlerFunc(am.ListAllConfigs)
Expand Down
3 changes: 2 additions & 1 deletion pkg/alertmanager/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,12 @@ func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request)
d.requestsInFlight.Add(1)
defer d.requestsInFlight.Done()

userID, err := tenant.TenantID(r.Context())
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
userID := tenant.JoinTenantIDs(tenantIDs)

logger := util_log.WithContext(r.Context(), d.logger)

Expand Down
9 changes: 6 additions & 3 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,11 +1003,12 @@ func (am *MultitenantAlertmanager) HandleRequest(ctx context.Context, in *httpgr

// serveRequest serves the Alertmanager's web UI and API.
func (am *MultitenantAlertmanager) serveRequest(w http.ResponseWriter, req *http.Request) {
userID, err := tenant.TenantID(req.Context())
tenantIDs, err := tenant.TenantIDs(req.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
userID := tenant.JoinTenantIDs(tenantIDs)
am.alertmanagersMtx.Lock()
userAM, ok := am.alertmanagers[userID]
am.alertmanagersMtx.Unlock()
Expand Down Expand Up @@ -1148,10 +1149,11 @@ func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, use

// UpdateState implements the Alertmanager service.
func (am *MultitenantAlertmanager) UpdateState(ctx context.Context, part *clusterpb.Part) (*alertmanagerpb.UpdateStateResponse, error) {
userID, err := tenant.TenantID(ctx)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
userID := tenant.JoinTenantIDs(tenantIDs)

am.alertmanagersMtx.Lock()
userAM, ok := am.alertmanagers[userID]
Expand Down Expand Up @@ -1254,10 +1256,11 @@ func (am *MultitenantAlertmanager) getPerUserDirectories() map[string]string {

// UpdateState implements the Alertmanager service.
func (am *MultitenantAlertmanager) ReadState(ctx context.Context, req *alertmanagerpb.ReadStateRequest) (*alertmanagerpb.ReadStateResponse, error) {
userID, err := tenant.TenantID(ctx)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
userID := tenant.JoinTenantIDs(tenantIDs)

am.alertmanagersMtx.Lock()
userAM, ok := am.alertmanagers[userID]
Expand Down
5 changes: 3 additions & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
// single tenant. This allows for a less impactful enabling of tenant
// federation.
byPassForSingleQuerier := true
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier))
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier, t.Cfg.TenantFederation))
}
return nil, nil
}
Expand Down Expand Up @@ -662,8 +662,9 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
// TODO: Consider wrapping logger to differentiate from querier module logger
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, rulerRegisterer, util_log.Logger)
mq := tenantfederation.NewQueryable(queryable, false, t.Cfg.TenantFederation)

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, prometheus.DefaultRegisterer)
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, mq, engine, t.Overrides, prometheus.DefaultRegisterer)
manager, err := ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger)
if err != nil {
return nil, err
Expand Down
26 changes: 17 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,23 +954,31 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through

// MetricsMetadata returns all metric metadata of a user.
func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
replicationSet, err := d.GetIngestersForMetadata(ctx)
userIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}

req := &ingester_client.MetricsMetadataRequest{}
// TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled.
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return client.MetricsMetadata(ctx, req)
})
if err != nil {
return nil, err
var userResps []interface{}
for _, userID := range userIDs {
ctx = user.InjectOrgID(ctx, userID)
replicationSet, err := d.GetIngestersForMetadata(ctx)
if err != nil {
return nil, err
}
req := &ingester_client.MetricsMetadataRequest{}
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return client.MetricsMetadata(ctx, req)
})
if err != nil {
return nil, err
}
userResps = append(userResps, resps...)
}

result := []scrape.MetricMetadata{}
dedupTracker := map[cortexpb.MetricMetadata]struct{}{}
for _, resp := range resps {
for _, resp := range userResps {
r := resp.(*ingester_client.MetricsMetadataResponse)
for _, m := range r.Metadata {
// Given we look across all ingesters - dedup the metadata.
Expand Down
31 changes: 17 additions & 14 deletions pkg/querier/tenantfederation/merge_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
const (
defaultTenantLabel = "__tenant_id__"
retainExistingPrefix = "original_"
maxConcurrency = 16
)

// NewQueryable returns a queryable that iterates through all the tenant IDs
Expand All @@ -36,8 +35,8 @@ const (
// If the label "__tenant_id__" is already existing, its value is overwritten
// by the tenant ID and the previous value is exposed through a new label
// prefixed with "original_". This behaviour is not implemented recursively.
func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool) storage.Queryable {
return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier)
func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool, cfg Config) storage.Queryable {
return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier, cfg)
}

func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback {
Expand Down Expand Up @@ -80,18 +79,20 @@ type MergeQuerierCallback func(ctx context.Context, mint int64, maxt int64) (ids
// If the label `idLabelName` is already existing, its value is overwritten and
// the previous value is exposed through a new label prefixed with "original_".
// This behaviour is not implemented recursively.
func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool) storage.Queryable {
func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool, cfg Config) storage.Queryable {
return &mergeQueryable{
idLabelName: idLabelName,
callback: callback,
byPassWithSingleQuerier: byPassWithSingleQuerier,
cfg: cfg,
}
}

type mergeQueryable struct {
idLabelName string
byPassWithSingleQuerier bool
callback MergeQuerierCallback
cfg Config
}

// Querier returns a new mergeQuerier, which aggregates results from multiple
Expand All @@ -111,10 +112,11 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
}

return &mergeQuerier{
ctx: ctx,
idLabelName: m.idLabelName,
queriers: queriers,
ids: ids,
ctx: ctx,
idLabelName: m.idLabelName,
queriers: queriers,
ids: ids,
maxConcurrency: m.cfg.MaxConcurrency,
}, nil
}

Expand All @@ -125,10 +127,11 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
// the previous value is exposed through a new label prefixed with "original_".
// This behaviour is not implemented recursively
type mergeQuerier struct {
ctx context.Context
queriers []storage.Querier
idLabelName string
ids []string
ctx context.Context
queriers []storage.Querier
idLabelName string
ids []string
maxConcurrency int
}

// LabelValues returns all potential values for a label name. It is not safe
Expand Down Expand Up @@ -248,7 +251,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, te
return nil
}

err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run)
err := concurrency.ForEach(m.ctx, jobs, m.maxConcurrency, run)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -334,7 +337,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match
return nil
}

err := concurrency.ForEach(ctx, jobs, maxConcurrency, run)
err := concurrency.ForEach(ctx, jobs, m.maxConcurrency, run)
if err != nil {
return storage.ErrSeriesSet(err)
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/querier/tenantfederation/merge_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ type mergeQueryableScenario struct {

func (s *mergeQueryableScenario) init() (storage.Querier, error) {
// initialize with default tenant label
q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier)
q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier, Config{
MaxConcurrency: 16,
})

// inject tenants into context
ctx := context.Background()
Expand Down Expand Up @@ -334,7 +336,9 @@ type labelValuesScenario struct {
func TestMergeQueryable_Querier(t *testing.T) {
t.Run("querying without a tenant specified should error", func(t *testing.T) {
queryable := &mockTenantQueryableWithFilter{}
q := NewQueryable(queryable, false /* byPassWithSingleQuerier */)
q := NewQueryable(queryable, false /* bypasswithsinglequerier */, Config{
MaxConcurrency: 16,
})
// Create a context with no tenant specified.
ctx := context.Background()

Expand Down Expand Up @@ -873,7 +877,9 @@ func TestTracingMergeQueryable(t *testing.T) {
// set a multi tenant resolver
tenant.WithDefaultResolver(tenant.NewMultiResolver())
filter := mockTenantQueryableWithFilter{}
q := NewQueryable(&filter, false)
q := NewQueryable(&filter, false, Config{
MaxConcurrency: 16,
})
// retrieve querier if set
querier, err := q.Querier(ctx, mint, maxt)
require.NoError(t, err)
Expand Down
4 changes: 3 additions & 1 deletion pkg/querier/tenantfederation/tenant_federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (

type Config struct {
// Enabled switches on support for multi tenant query federation
Enabled bool `yaml:"enabled"`
Enabled bool `yaml:"enabled"`
MaxConcurrency int `yaml:"max-concurrency"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).")
f.IntVar(&cfg.MaxConcurrency, "tenant-federation.max-concurrency", 16, "Maximum concurrent federated sub queries used when evaluating a federated query")
}
11 changes: 6 additions & 5 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func NewAPI(r *Ruler, s rulestore.RuleStore, logger log.Logger) *API {

func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {
logger := util_log.WithContext(req.Context(), a.logger)
userID, err := tenant.TenantID(req.Context())
if err != nil || userID == "" {
tenantIDs, err := tenant.TenantIDs(req.Context())
if err != nil || len(tenantIDs) == 0 {
level.Error(logger).Log("msg", "error extracting org id from context", "err", err)
respondError(logger, w, "no valid org id found")
return
Expand Down Expand Up @@ -230,8 +230,8 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {

func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) {
logger := util_log.WithContext(req.Context(), a.logger)
userID, err := tenant.TenantID(req.Context())
if err != nil || userID == "" {
tenantIDs, err := tenant.TenantIDs(req.Context())
if err != nil || len(tenantIDs) == 0 {
level.Error(logger).Log("msg", "error extracting org id from context", "err", err)
respondError(logger, w, "no valid org id found")
return
Expand Down Expand Up @@ -359,10 +359,11 @@ func parseGroupName(params map[string]string) (string, error) {
// and returns them in that order. It also allows users to require a namespace or group name and return
// an error if it they can not be parsed.
func parseRequest(req *http.Request, requireNamespace, requireGroup bool) (string, string, string, error) {
userID, err := tenant.TenantID(req.Context())
tenantIDs, err := tenant.TenantIDs(req.Context())
if err != nil {
return "", "", "", user.ErrNoOrgID
}
userID := tenant.JoinTenantIDs(tenantIDs)

vars := mux.Vars(req)

Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,15 @@ rules:
router.Path("/api/v1/rules/{namespace}").Methods("POST").HandlerFunc(a.CreateRuleGroup)
router.Path("/api/v1/rules/{namespace}/{groupName}").Methods("GET").HandlerFunc(a.GetRuleGroup)
// POST
req := requestFor(t, http.MethodPost, "https://localhost:8080/api/v1/rules/namespace", strings.NewReader(tt.input), "user1")
req := requestFor(t, http.MethodPost, "https://localhost:8080/api/v1/rules/namespace", strings.NewReader(tt.input), "user1|user2|user3")
w := httptest.NewRecorder()

router.ServeHTTP(w, req)
require.Equal(t, tt.status, w.Code)

if tt.err == nil {
// GET
req = requestFor(t, http.MethodGet, "https://localhost:8080/api/v1/rules/namespace/test", nil, "user1")
req = requestFor(t, http.MethodGet, "https://localhost:8080/api/v1/rules/namespace/test", nil, "user1|user2|user3")
w = httptest.NewRecorder()

router.ServeHTTP(w, req)
Expand Down
Loading