Skip to content

Commit

Permalink
Refactored limits to be able to define them dynamically when Cortex i…
Browse files Browse the repository at this point in the history
…s vendored in another project (#1549)

* Refactored limits to be able to define them dynamically when vendoring in Cortex

Added OverridesManager which would store default limits and per tenant overrides as an interface
Factory method for creating OverridesManager accepts overrides reload config, method to load overrides from yaml file and default limit

Sample and Labels validation methods now accept any type which implement required methods to get required limits for validation

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* renamed receiver for overrides manager methods

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* added comment regarding loading yaml

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* exported OverridesLoader for godoc and removed an unused interface

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* added struct to hold config for overrides manager

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* Added a todo for moving overrides loader to OverridesManager when a bug in yamlv3 decoder is fixed

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
  • Loading branch information
sandeepsukhani authored and tomwilkie committed Aug 21, 2019
1 parent 0460dad commit 870fe73
Show file tree
Hide file tree
Showing 7 changed files with 393 additions and 288 deletions.
4 changes: 2 additions & 2 deletions pkg/distributor/distributor.go
Expand Up @@ -331,15 +331,15 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
}

labelsHistogram.Observe(float64(len(ts.Labels)))
if err := d.limits.ValidateLabels(userID, ts.Labels); err != nil {
if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil {
lastPartialErr = err
continue
}

metricName, _ := extract.MetricNameFromLabelAdapters(ts.Labels)
samples := make([]client.Sample, 0, len(ts.Samples))
for _, s := range ts.Samples {
if err := d.limits.ValidateSample(userID, metricName, s); err != nil {
if err := validation.ValidateSample(d.limits, userID, metricName, s); err != nil {
lastPartialErr = err
continue
}
Expand Down
40 changes: 25 additions & 15 deletions pkg/distributor/distributor_test.go
Expand Up @@ -97,7 +97,7 @@ func TestDistributorPush(t *testing.T) {
} {
for _, shardByAllLabels := range []bool{true, false} {
t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v)", i, shardByAllLabels), func(t *testing.T) {
d := prepare(t, tc.numIngesters, tc.happyIngesters, 0, shardByAllLabels)
d := prepare(t, tc.numIngesters, tc.happyIngesters, 0, shardByAllLabels, nil)
defer d.Stop()

request := makeWriteRequest(tc.samples)
Expand Down Expand Up @@ -150,8 +150,11 @@ func TestDistributorPushHAInstances(t *testing.T) {
} {
for _, shardByAllLabels := range []bool{true, false} {
t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v)", i, shardByAllLabels), func(t *testing.T) {
d := prepare(t, 1, 1, 0, shardByAllLabels)
d.limits.Defaults.AcceptHASamples = true
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.AcceptHASamples = true

d := prepare(t, 1, 1, 0, shardByAllLabels, &limits)
codec := codec.Proto{Factory: ProtoReplicaDescFactory}
mock := kv.PrefixClient(consul.NewInMemoryClient(codec), "prefix")

Expand Down Expand Up @@ -273,7 +276,7 @@ func TestDistributorPushQuery(t *testing.T) {

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
d := prepare(t, tc.numIngesters, tc.happyIngesters, 0, tc.shardByAllLabels)
d := prepare(t, tc.numIngesters, tc.happyIngesters, 0, tc.shardByAllLabels, nil)
defer d.Stop()

request := makeWriteRequest(tc.samples)
Expand Down Expand Up @@ -305,7 +308,7 @@ func TestSlowQueries(t *testing.T) {
if nIngesters-happy > 1 {
expectedErr = promql.ErrStorage{Err: errFail}
}
d := prepare(t, nIngesters, happy, 100*time.Millisecond, shardByAllLabels)
d := prepare(t, nIngesters, happy, 100*time.Millisecond, shardByAllLabels, nil)
defer d.Stop()

_, err := d.Query(ctx, 0, 10, nameMatcher)
Expand All @@ -317,7 +320,7 @@ func TestSlowQueries(t *testing.T) {
}
}

func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Duration, shardByAllLabels bool) *Distributor {
func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Duration, shardByAllLabels bool, limits *validation.Limits) *Distributor {
ingesters := []mockIngester{}
for i := 0; i < happyIngesters; i++ {
ingesters = append(ingesters, mockIngester{
Expand Down Expand Up @@ -355,16 +358,20 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur
}

var cfg Config
var limits validation.Limits
var clientConfig client.Config
flagext.DefaultValues(&cfg, &limits, &clientConfig)
flagext.DefaultValues(&cfg, &clientConfig)

if limits == nil {
limits = &validation.Limits{}
flagext.DefaultValues(limits)
}
limits.IngestionRate = 20
limits.IngestionBurstSize = 20
cfg.ingesterClientFactory = factory
cfg.ShardByAllLabels = shardByAllLabels
cfg.ExtraQueryDelay = 50 * time.Millisecond

overrides, err := validation.NewOverrides(limits)
overrides, err := validation.NewOverrides(*limits)
require.NoError(t, err)

d, err := New(cfg, clientConfig, overrides, ring)
Expand Down Expand Up @@ -694,13 +701,16 @@ func TestDistributorValidation(t *testing.T) {
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
d := prepare(t, 3, 3, 0, true)
defer d.Stop()
var limits validation.Limits
flagext.DefaultValues(&limits)

limits.CreationGracePeriod = 2 * time.Hour
limits.RejectOldSamples = true
limits.RejectOldSamplesMaxAge = 24 * time.Hour
limits.MaxLabelNamesPerSeries = 2

d.limits.Defaults.CreationGracePeriod = 2 * time.Hour
d.limits.Defaults.RejectOldSamples = true
d.limits.Defaults.RejectOldSamplesMaxAge = 24 * time.Hour
d.limits.Defaults.MaxLabelNamesPerSeries = 2
d := prepare(t, 3, 3, 0, true, &limits)
defer d.Stop()

_, err := d.Push(ctx, client.ToWriteRequest(tc.samples, client.API))
require.Equal(t, tc.err, err)
Expand Down
182 changes: 181 additions & 1 deletion pkg/util/validation/limits.go
Expand Up @@ -2,7 +2,10 @@ package validation

import (
"flag"
"os"
"time"

"gopkg.in/yaml.v2"
)

// Limits describe all the limits for users; can be used to describe global default
Expand Down Expand Up @@ -73,7 +76,184 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error {
// We want to set c to the defaults and then overwrite it with the input.
// To make unmarshal fill the plain data struct rather than calling UnmarshalYAML
// again, we have to hide it using a type indirection. See prometheus/config.
*l = defaultLimits

// During startup we wont have a default value so we don't want to overwrite them
if defaultLimits != nil {
*l = *defaultLimits
}
type plain Limits
return unmarshal((*plain)(l))
}

// When we load YAML from disk, we want the various per-customer limits
// to default to any values specified on the command line, not default
// command line values. This global contains those values. I (Tom) cannot
// find a nicer way I'm afraid.
var defaultLimits *Limits

// Overrides periodically fetch a set of per-user overrides, and provides convenience
// functions for fetching the correct value.
type Overrides struct {
overridesManager *OverridesManager
}

// NewOverrides makes a new Overrides.
// We store the supplied limits in a global variable to ensure per-tenant limits
// are defaulted to those values. As such, the last call to NewOverrides will
// become the new global defaults.
func NewOverrides(defaults Limits) (*Overrides, error) {
defaultLimits = &defaults
overridesManagerConfig := OverridesManagerConfig{
OverridesReloadPeriod: defaults.PerTenantOverridePeriod,
OverridesLoadPath: defaults.PerTenantOverrideConfig,
OverridesLoader: loadOverrides,
Defaults: &defaults,
}

overridesManager, err := NewOverridesManager(overridesManagerConfig)
if err != nil {
return nil, err
}

return &Overrides{
overridesManager: overridesManager,
}, nil
}

// Stop background reloading of overrides.
func (o *Overrides) Stop() {
o.overridesManager.Stop()
}

// IngestionRate returns the limit on ingester rate (samples per second).
func (o *Overrides) IngestionRate(userID string) float64 {
return o.overridesManager.GetLimits(userID).(*Limits).IngestionRate
}

// IngestionBurstSize returns the burst size for ingestion rate.
func (o *Overrides) IngestionBurstSize(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).IngestionBurstSize
}

// AcceptHASamples returns whether the distributor should track and accept samples from HA replicas for this user.
func (o *Overrides) AcceptHASamples(userID string) bool {
return o.overridesManager.GetLimits(userID).(*Limits).AcceptHASamples
}

// HAReplicaLabel returns the replica label to look for when deciding whether to accept a sample from a Prometheus HA replica.
func (o *Overrides) HAReplicaLabel(userID string) string {
return o.overridesManager.GetLimits(userID).(*Limits).HAReplicaLabel
}

// HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica.
func (o *Overrides) HAClusterLabel(userID string) string {
return o.overridesManager.GetLimits(userID).(*Limits).HAClusterLabel
}

// MaxLabelNameLength returns maximum length a label name can be.
func (o *Overrides) MaxLabelNameLength(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxLabelNameLength
}

// MaxLabelValueLength returns maximum length a label value can be. This also is
// the maximum length of a metric name.
func (o *Overrides) MaxLabelValueLength(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxLabelValueLength
}

// MaxLabelNamesPerSeries returns maximum number of label/value pairs timeseries.
func (o *Overrides) MaxLabelNamesPerSeries(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxLabelNamesPerSeries
}

// RejectOldSamples returns true when we should reject samples older than certain
// age.
func (o *Overrides) RejectOldSamples(userID string) bool {
return o.overridesManager.GetLimits(userID).(*Limits).RejectOldSamples
}

// RejectOldSamplesMaxAge returns the age at which samples should be rejected.
func (o *Overrides) RejectOldSamplesMaxAge(userID string) time.Duration {
return o.overridesManager.GetLimits(userID).(*Limits).RejectOldSamplesMaxAge
}

// CreationGracePeriod is misnamed, and actually returns how far into the future
// we should accept samples.
func (o *Overrides) CreationGracePeriod(userID string) time.Duration {
return o.overridesManager.GetLimits(userID).(*Limits).CreationGracePeriod
}

// MaxSeriesPerQuery returns the maximum number of series a query is allowed to hit.
func (o *Overrides) MaxSeriesPerQuery(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxSeriesPerQuery
}

// MaxSamplesPerQuery returns the maximum number of samples in a query (from the ingester).
func (o *Overrides) MaxSamplesPerQuery(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxSamplesPerQuery
}

// MaxSeriesPerUser returns the maximum number of series a user is allowed to store.
func (o *Overrides) MaxSeriesPerUser(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxSeriesPerUser
}

// MaxSeriesPerMetric returns the maximum number of series allowed per metric.
func (o *Overrides) MaxSeriesPerMetric(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxSeriesPerMetric
}

// MaxChunksPerQuery returns the maximum number of chunks allowed per query.
func (o *Overrides) MaxChunksPerQuery(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxChunksPerQuery
}

// MaxQueryLength returns the limit of the length (in time) of a query.
func (o *Overrides) MaxQueryLength(userID string) time.Duration {
return o.overridesManager.GetLimits(userID).(*Limits).MaxQueryLength
}

// MaxQueryParallelism returns the limit to the number of sub-queries the
// frontend will process in parallel.
func (o *Overrides) MaxQueryParallelism(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxQueryParallelism
}

// EnforceMetricName whether to enforce the presence of a metric name.
func (o *Overrides) EnforceMetricName(userID string) bool {
return o.overridesManager.GetLimits(userID).(*Limits).EnforceMetricName
}

// CardinalityLimit whether to enforce the presence of a metric name.
func (o *Overrides) CardinalityLimit(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).CardinalityLimit
}

// Loads overrides and returns the limits as an interface to store them in OverridesManager.
// We need to implement it here since OverridesManager must store type Limits in an interface but
// it doesn't know its definition to initialize it.
// We could have used yamlv3.Node for this but there is no way to enforce strict decoding due to a bug in it
// TODO: Use yamlv3.Node to move this to OverridesManager after https://github.com/go-yaml/yaml/issues/460 is fixed
func loadOverrides(filename string) (map[string]interface{}, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
}

var overrides struct {
Overrides map[string]*Limits `yaml:"overrides"`
}

decoder := yaml.NewDecoder(f)
decoder.SetStrict(true)
if err := decoder.Decode(&overrides); err != nil {
return nil, err
}

overridesAsInterface := map[string]interface{}{}
for userID := range overrides.Overrides {
overridesAsInterface[userID] = overrides.Overrides[userID]
}

return overridesAsInterface, nil
}

0 comments on commit 870fe73

Please sign in to comment.