Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored limits to be able to define them dynamically when Cortex is vendored in another project #1549

Merged
merged 6 commits into from Aug 21, 2019
4 changes: 2 additions & 2 deletions pkg/distributor/distributor.go
Expand Up @@ -331,15 +331,15 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
}

labelsHistogram.Observe(float64(len(ts.Labels)))
if err := d.limits.ValidateLabels(userID, ts.Labels); err != nil {
if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil {
lastPartialErr = err
continue
}

metricName, _ := extract.MetricNameFromLabelAdapters(ts.Labels)
samples := make([]client.Sample, 0, len(ts.Samples))
for _, s := range ts.Samples {
if err := d.limits.ValidateSample(userID, metricName, s); err != nil {
if err := validation.ValidateSample(d.limits, userID, metricName, s); err != nil {
lastPartialErr = err
continue
}
Expand Down
40 changes: 25 additions & 15 deletions pkg/distributor/distributor_test.go
Expand Up @@ -97,7 +97,7 @@ func TestDistributorPush(t *testing.T) {
} {
for _, shardByAllLabels := range []bool{true, false} {
t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v)", i, shardByAllLabels), func(t *testing.T) {
d := prepare(t, tc.numIngesters, tc.happyIngesters, 0, shardByAllLabels)
d := prepare(t, tc.numIngesters, tc.happyIngesters, 0, shardByAllLabels, nil)
defer d.Stop()

request := makeWriteRequest(tc.samples)
Expand Down Expand Up @@ -150,8 +150,11 @@ func TestDistributorPushHAInstances(t *testing.T) {
} {
for _, shardByAllLabels := range []bool{true, false} {
t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v)", i, shardByAllLabels), func(t *testing.T) {
d := prepare(t, 1, 1, 0, shardByAllLabels)
d.limits.Defaults.AcceptHASamples = true
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.AcceptHASamples = true

d := prepare(t, 1, 1, 0, shardByAllLabels, &limits)
codec := codec.Proto{Factory: ProtoReplicaDescFactory}
mock := kv.PrefixClient(consul.NewInMemoryClient(codec), "prefix")

Expand Down Expand Up @@ -273,7 +276,7 @@ func TestDistributorPushQuery(t *testing.T) {

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
d := prepare(t, tc.numIngesters, tc.happyIngesters, 0, tc.shardByAllLabels)
d := prepare(t, tc.numIngesters, tc.happyIngesters, 0, tc.shardByAllLabels, nil)
defer d.Stop()

request := makeWriteRequest(tc.samples)
Expand Down Expand Up @@ -305,7 +308,7 @@ func TestSlowQueries(t *testing.T) {
if nIngesters-happy > 1 {
expectedErr = promql.ErrStorage{Err: errFail}
}
d := prepare(t, nIngesters, happy, 100*time.Millisecond, shardByAllLabels)
d := prepare(t, nIngesters, happy, 100*time.Millisecond, shardByAllLabels, nil)
defer d.Stop()

_, err := d.Query(ctx, 0, 10, nameMatcher)
Expand All @@ -317,7 +320,7 @@ func TestSlowQueries(t *testing.T) {
}
}

func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Duration, shardByAllLabels bool) *Distributor {
func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Duration, shardByAllLabels bool, limits *validation.Limits) *Distributor {
ingesters := []mockIngester{}
for i := 0; i < happyIngesters; i++ {
ingesters = append(ingesters, mockIngester{
Expand Down Expand Up @@ -355,16 +358,20 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur
}

var cfg Config
var limits validation.Limits
var clientConfig client.Config
flagext.DefaultValues(&cfg, &limits, &clientConfig)
flagext.DefaultValues(&cfg, &clientConfig)

if limits == nil {
limits = &validation.Limits{}
flagext.DefaultValues(limits)
}
limits.IngestionRate = 20
limits.IngestionBurstSize = 20
cfg.ingesterClientFactory = factory
cfg.ShardByAllLabels = shardByAllLabels
cfg.ExtraQueryDelay = 50 * time.Millisecond

overrides, err := validation.NewOverrides(limits)
overrides, err := validation.NewOverrides(*limits)
require.NoError(t, err)

d, err := New(cfg, clientConfig, overrides, ring)
Expand Down Expand Up @@ -694,13 +701,16 @@ func TestDistributorValidation(t *testing.T) {
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
d := prepare(t, 3, 3, 0, true)
defer d.Stop()
var limits validation.Limits
flagext.DefaultValues(&limits)

limits.CreationGracePeriod = 2 * time.Hour
limits.RejectOldSamples = true
limits.RejectOldSamplesMaxAge = 24 * time.Hour
limits.MaxLabelNamesPerSeries = 2

d.limits.Defaults.CreationGracePeriod = 2 * time.Hour
d.limits.Defaults.RejectOldSamples = true
d.limits.Defaults.RejectOldSamplesMaxAge = 24 * time.Hour
d.limits.Defaults.MaxLabelNamesPerSeries = 2
d := prepare(t, 3, 3, 0, true, &limits)
defer d.Stop()

_, err := d.Push(ctx, client.ToWriteRequest(tc.samples, client.API))
require.Equal(t, tc.err, err)
Expand Down
175 changes: 174 additions & 1 deletion pkg/util/validation/limits.go
Expand Up @@ -2,7 +2,10 @@ package validation

import (
"flag"
"os"
"time"

"gopkg.in/yaml.v2"
)

// Limits describe all the limits for users; can be used to describe global default
Expand Down Expand Up @@ -73,7 +76,177 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error {
// We want to set c to the defaults and then overwrite it with the input.
// To make unmarshal fill the plain data struct rather than calling UnmarshalYAML
// again, we have to hide it using a type indirection. See prometheus/config.
*l = defaultLimits

// During startup we wont have a default value so we don't want to overwrite them
if defaultLimits != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this values were getting reset to 0 while loading cortex config which had just per tenant overrides config path set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this not happen before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sandlis explained offline:

  1. we init the flags, which sets up the default in a Limits struct in cortex pkg.
  2. we then load YAML, which will overwrite them with 0s, as defaultLimits has not been inited.
  3. we Parse the flags, which set only ones we've specifices
  4. we then set this defaultLimits to the half-zero'd struct.

*l = *defaultLimits
}
type plain Limits
return unmarshal((*plain)(l))
}

// When we load YAML from disk, we want the various per-customer limits
// to default to any values specified on the command line, not default
// command line values. This global contains those values. I (Tom) cannot
// find a nicer way I'm afraid.
var defaultLimits *Limits

// Overrides periodically fetch a set of per-user overrides, and provides convenience
// functions for fetching the correct value.
type Overrides struct {
overridesManager *OverridesManager
}

// NewOverrides makes a new Overrides.
// We store the supplied limits in a global variable to ensure per-tenant limits
// are defaulted to those values. As such, the last call to NewOverrides will
// become the new global defaults.
func NewOverrides(defaults Limits) (*Overrides, error) {
defaultLimits = &defaults

overridesManager, err := NewOverridesManager(defaults.PerTenantOverridePeriod, defaults.PerTenantOverrideConfig,
loadOverrides, &defaults)
if err != nil {
return nil, err
}

return &Overrides{
overridesManager: overridesManager,
}, nil
}

// Stop background reloading of overrides.
func (o *Overrides) Stop() {
o.overridesManager.Stop()
}

// IngestionRate returns the limit on ingester rate (samples per second).
func (o *Overrides) IngestionRate(userID string) float64 {
return o.overridesManager.GetLimits(userID).(*Limits).IngestionRate
}

// IngestionBurstSize returns the burst size for ingestion rate.
func (o *Overrides) IngestionBurstSize(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).IngestionBurstSize
}

// AcceptHASamples returns whether the distributor should track and accept samples from HA replicas for this user.
func (o *Overrides) AcceptHASamples(userID string) bool {
return o.overridesManager.GetLimits(userID).(*Limits).AcceptHASamples
}

// HAReplicaLabel returns the replica label to look for when deciding whether to accept a sample from a Prometheus HA replica.
func (o *Overrides) HAReplicaLabel(userID string) string {
return o.overridesManager.GetLimits(userID).(*Limits).HAReplicaLabel
}

// HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica.
func (o *Overrides) HAClusterLabel(userID string) string {
return o.overridesManager.GetLimits(userID).(*Limits).HAClusterLabel
}

// MaxLabelNameLength returns maximum length a label name can be.
func (o *Overrides) MaxLabelNameLength(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxLabelNameLength
}

// MaxLabelValueLength returns maximum length a label value can be. This also is
// the maximum length of a metric name.
func (o *Overrides) MaxLabelValueLength(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxLabelValueLength
}

// MaxLabelNamesPerSeries returns maximum number of label/value pairs timeseries.
func (o *Overrides) MaxLabelNamesPerSeries(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxLabelNamesPerSeries
}

// RejectOldSamples returns true when we should reject samples older than certain
// age.
func (o *Overrides) RejectOldSamples(userID string) bool {
return o.overridesManager.GetLimits(userID).(*Limits).RejectOldSamples
}

// RejectOldSamplesMaxAge returns the age at which samples should be rejected.
func (o *Overrides) RejectOldSamplesMaxAge(userID string) time.Duration {
return o.overridesManager.GetLimits(userID).(*Limits).RejectOldSamplesMaxAge
}

// CreationGracePeriod is misnamed, and actually returns how far into the future
// we should accept samples.
func (o *Overrides) CreationGracePeriod(userID string) time.Duration {
return o.overridesManager.GetLimits(userID).(*Limits).CreationGracePeriod
}

// MaxSeriesPerQuery returns the maximum number of series a query is allowed to hit.
func (o *Overrides) MaxSeriesPerQuery(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxSeriesPerQuery
}

// MaxSamplesPerQuery returns the maximum number of samples in a query (from the ingester).
func (o *Overrides) MaxSamplesPerQuery(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxSamplesPerQuery
}

// MaxSeriesPerUser returns the maximum number of series a user is allowed to store.
func (o *Overrides) MaxSeriesPerUser(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxSeriesPerUser
}

// MaxSeriesPerMetric returns the maximum number of series allowed per metric.
func (o *Overrides) MaxSeriesPerMetric(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxSeriesPerMetric
}

// MaxChunksPerQuery returns the maximum number of chunks allowed per query.
func (o *Overrides) MaxChunksPerQuery(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxChunksPerQuery
}

// MaxQueryLength returns the limit of the length (in time) of a query.
func (o *Overrides) MaxQueryLength(userID string) time.Duration {
return o.overridesManager.GetLimits(userID).(*Limits).MaxQueryLength
}

// MaxQueryParallelism returns the limit to the number of sub-queries the
// frontend will process in parallel.
func (o *Overrides) MaxQueryParallelism(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxQueryParallelism
}

// EnforceMetricName whether to enforce the presence of a metric name.
func (o *Overrides) EnforceMetricName(userID string) bool {
return o.overridesManager.GetLimits(userID).(*Limits).EnforceMetricName
}

// CardinalityLimit whether to enforce the presence of a metric name.
func (o *Overrides) CardinalityLimit(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).CardinalityLimit
}

// Loads overrides and returns the limits as an interface to store them in OverridesManager.
// We need to implement it here since OverridesManager must store type Limits in an interface but
// it doesn't know its definition to initialize it.
func loadOverrides(filename string) (map[string]interface{}, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
}

var overrides struct {
Overrides map[string]*Limits `yaml:"overrides"`
}

decoder := yaml.NewDecoder(f)
decoder.SetStrict(true)
if err := decoder.Decode(&overrides); err != nil {
return nil, err
}

overridesAsInterface := map[string]interface{}{}
for userID := range overrides.Overrides {
overridesAsInterface[userID] = overrides.Overrides[userID]
}

return overridesAsInterface, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this take a "factory" for the Limits type? Then have the code to make a map vs the defaults would be in override.go.

I think this is possible with yaml.v3 and Node: https://godoc.org/gopkg.in/yaml.v3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, Node.Decode doesn't honour KnownFields: go-yaml/yaml#460