Skip to content

Commit

Permalink
Use BasicLifecycler for distributors and auto-forget
Browse files Browse the repository at this point in the history
Use the BasicLifecycler in distributors for managing their lifecycle so
that we can take advantage of the "auto-forget" delegates feature. This
prevents the ring from filling up with "unhealthy" distributors that are
never removed. This wasn't a bug but it was confusing for users and
operators.

Fixes #2138

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
  • Loading branch information
56quarters committed Jun 21, 2022
1 parent b13d2df commit 5a6741d
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 152 deletions.
104 changes: 61 additions & 43 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/limiter"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
Expand Down Expand Up @@ -65,8 +66,12 @@ var (
)

const (
// DistributorRingKey is the key under which we store the distributors ring in the KVStore.
DistributorRingKey = "distributor"
// distributorRingKey is the key under which we store the distributors ring in the KVStore.
distributorRingKey = "distributor"

// ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
// in the ring will be automatically removed.
ringAutoForgetUnhealthyPeriods = 10
)

const (
Expand All @@ -87,8 +92,9 @@ type Distributor struct {

// The global rate limiter requires a distributors ring to count
// the number of healthy instances
distributorsLifeCycler *ring.Lifecycler
distributorsLifeCycler *ring.BasicLifecycler
distributorsRing *ring.Ring
healthyInstancesCount *atomic.Uint32

// For handling HA replicas.
HATracker *haTracker
Expand Down Expand Up @@ -206,44 +212,16 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
subservices := []services.Service(nil)
subservices = append(subservices, haTracker)

// Create the configured ingestion rate limit strategy (local or global). In case
// it's an internal dependency and can't join the distributors ring, we skip rate
// limiting.
var ingestionRateStrategy, requestRateStrategy limiter.RateLimiterStrategy
var distributorsLifeCycler *ring.Lifecycler
var distributorsRing *ring.Ring

if !canJoinDistributorsRing {
requestRateStrategy = newInfiniteRateStrategy()
ingestionRateStrategy = newInfiniteRateStrategy()
} else {
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", DistributorRingKey, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, err
}

distributorsRing, err = ring.New(cfg.DistributorRing.ToRingConfig(), "distributor", DistributorRingKey, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, errors.Wrap(err, "failed to initialize distributors' ring client")
}
subservices = append(subservices, distributorsLifeCycler, distributorsRing)

requestRateStrategy = newGlobalRateStrategy(newRequestRateStrategy(limits), distributorsLifeCycler)
ingestionRateStrategy = newGlobalRateStrategy(newIngestionRateStrategy(limits), distributorsLifeCycler)
}

d := &Distributor{
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
distributorsLifeCycler: distributorsLifeCycler,
distributorsRing: distributorsRing,
limits: limits,
requestRateLimiter: limiter.NewRateLimiter(requestRateStrategy, 10*time.Second),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
healthyInstancesCount: atomic.NewUint32(0),
limits: limits,
forwarder: forwarding.NewForwarder(reg, cfg.Forwarding),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),

queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Expand Down Expand Up @@ -322,8 +300,9 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Help: "The configured replication factor.",
}),
latestSeenSampleTimestampPerUser: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_distributor_latest_seen_sample_timestamp_seconds",
Help: "Unix timestamp of latest received sample per user.",
Namespace: "cortex",
Name: "distributor_latest_seen_sample_timestamp_seconds",
Help: "Unix timestamp of latest received sample per user.",
}, []string{"user"}),
}

Expand Down Expand Up @@ -351,7 +330,45 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
return d.ingestionRate.Rate()
})

d.forwarder = forwarding.NewForwarder(reg, d.cfg.Forwarding)
// Create the configured ingestion rate limit strategy (local or global). In case
// it's an internal dependency and can't join the distributors ring, we skip rate
// limiting.
var ingestionRateStrategy, requestRateStrategy limiter.RateLimiterStrategy
var distributorsLifeCycler *ring.BasicLifecycler
var distributorsRing *ring.Ring

if !canJoinDistributorsRing {
requestRateStrategy = newInfiniteRateStrategy()
ingestionRateStrategy = newInfiniteRateStrategy()
} else {
kvStore, err := kv.NewClient(cfg.DistributorRing.KVStore, ring.GetCodec(), kv.RegistererWithKVName(reg, "distributor-lifecycler"), log)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize distributors' KV store")
}

delegate := ring.BasicLifecyclerDelegate(d)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.DistributorRing.HeartbeatTimeout, delegate, log)

distributorsLifeCycler, err = ring.NewBasicLifecycler(cfg.DistributorRing.ToBasicLifecyclerConfig(), "distributor", distributorRingKey, kvStore, delegate, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, errors.Wrap(err, "failed to initialize distributors' lifecycler")
}

distributorsRing, err = ring.New(cfg.DistributorRing.ToRingConfig(), "distributor", distributorRingKey, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, errors.Wrap(err, "failed to initialize distributors' ring client")
}
subservices = append(subservices, distributorsLifeCycler, distributorsRing)

requestRateStrategy = newGlobalRateStrategy(newRequestRateStrategy(limits), d)
ingestionRateStrategy = newGlobalRateStrategy(newIngestionRateStrategy(limits), d)
}

d.requestRateLimiter = limiter.NewRateLimiter(requestRateStrategy, 10*time.Second)
d.ingestionRateLimiter = limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second)
d.distributorsLifeCycler = distributorsLifeCycler
d.distributorsRing = distributorsRing

d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
d.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(d.cleanupInactiveUser)
Expand All @@ -361,6 +378,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
if err != nil {
return nil, err
}

d.subservicesWatcher = services.NewFailureWatcher()
d.subservicesWatcher.WatchManager(d.subservices)

Expand Down
52 changes: 17 additions & 35 deletions pkg/distributor/distributor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ import (
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/netutil"
"github.com/grafana/dskit/ring"
)

util_log "github.com/grafana/mimir/pkg/util/log"
const (
// ringNumTokens is how many tokens each distributor should have in the ring.
// Distributors use a ring because they need to know how many distributors there
// are in total for rate limiting.
ringNumTokens = 1
)

// RingConfig masks the ring lifecycler config which contains
Expand All @@ -43,7 +48,7 @@ type RingConfig struct {
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
hostname, err := os.Hostname()
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to get hostname", "err", err)
level.Error(logger).Log("msg", "failed to get hostname", "err", err)
os.Exit(1)
}

Expand All @@ -61,39 +66,16 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.StringVar(&cfg.InstanceID, "distributor.ring.instance-id", hostname, "Instance ID to register in the ring.")
}

// ToLifecyclerConfig returns a LifecyclerConfig based on the distributor
// ring config.
func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig {
// We have to make sure that the ring.LifecyclerConfig and ring.Config
// defaults are preserved
lc := ring.LifecyclerConfig{}
rc := ring.Config{}

flagext.DefaultValues(&lc)
flagext.DefaultValues(&rc)

// Configure ring
rc.KVStore = cfg.KVStore
rc.HeartbeatTimeout = cfg.HeartbeatTimeout
rc.ReplicationFactor = 1

// Configure lifecycler
lc.RingConfig = rc
lc.ListenPort = cfg.ListenPort
lc.Addr = cfg.InstanceAddr
lc.Port = cfg.InstancePort
lc.ID = cfg.InstanceID
lc.InfNames = cfg.InstanceInterfaceNames
lc.UnregisterOnShutdown = true
lc.HeartbeatPeriod = cfg.HeartbeatPeriod
lc.HeartbeatTimeout = cfg.HeartbeatTimeout
lc.ObservePeriod = 0
lc.NumTokens = 1
lc.JoinAfter = 0
lc.MinReadyDuration = 0
lc.FinalSleep = 0

return lc
func (cfg *RingConfig) ToBasicLifecyclerConfig() ring.BasicLifecyclerConfig {
return ring.BasicLifecyclerConfig{
ID: cfg.InstanceID,
Addr: cfg.InstanceAddr,
HeartbeatPeriod: cfg.HeartbeatPeriod,
HeartbeatTimeout: cfg.HeartbeatTimeout,
TokensObservePeriod: 0,
NumTokens: ringNumTokens,
KeepInstanceInTheRingOnShutdown: false,
}
}

func (cfg *RingConfig) ToRingConfig() ring.Config {
Expand Down
73 changes: 0 additions & 73 deletions pkg/distributor/distributor_ring_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2838,7 +2838,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
// updates to the expected size
if distributors[0].distributorsRing != nil {
test.Poll(t, time.Second, cfg.numDistributors, func() interface{} {
return distributors[0].distributorsLifeCycler.HealthyInstancesCount()
return distributors[0].HealthyInstancesCount()
})
}

Expand Down
45 changes: 45 additions & 0 deletions pkg/distributor/lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// SPDX-License-Identifier: AGPL-3.0-only

package distributor

import "github.com/grafana/dskit/ring"

// OnRingInstanceRegister implements the ring.BasicLifecyclerDelegate interface
func (d *Distributor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
var tokens []uint32
if instanceExists {
tokens = instanceDesc.GetTokens()
}

takenTokens := ringDesc.GetTokens()
newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens)

// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)

return ring.ACTIVE, tokens
}

// OnRingInstanceTokens implements the ring.BasicLifecyclerDelegate interface
func (d *Distributor) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {}

// OnRingInstanceStopping implements the ring.BasicLifecyclerDelegate interface
func (d *Distributor) OnRingInstanceStopping(_ *ring.BasicLifecycler) {}

// OnRingInstanceHeartbeat implements the ring.BasicLifecyclerDelegate interface
func (d *Distributor) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, ringDesc *ring.Desc, _ *ring.InstanceDesc) {
activeMembers := uint32(0)

for _, instance := range ringDesc.Ingesters {
if ring.ACTIVE == instance.State {
activeMembers++
}
}

d.healthyInstancesCount.Store(activeMembers)
}

// HealthyInstancesCount implements the ReadLifecycler interface
func (d *Distributor) HealthyInstancesCount() int {
return int(d.healthyInstancesCount.Load())
}

0 comments on commit 5a6741d

Please sign in to comment.