Skip to content

Commit

Permalink
Use BasicLifecycler for distributors and auto-forget
Browse files Browse the repository at this point in the history
Use the BasicLifecycler in distributors for managing their lifecycle so
that we can take advantage of the "auto-forget" delegates feature. This
prevents the ring from filling up with "unhealthy" distributors that are
never removed. This wasn't a bug but it was confusing for users and
operators.

Fixes #2138

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
  • Loading branch information
56quarters committed Jun 22, 2022
1 parent 017a738 commit 83c065d
Show file tree
Hide file tree
Showing 15 changed files with 256 additions and 160 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* [ENHANCEMENT] Ingester: reduce sleep time when reading WAL. #2098
* [ENHANCEMENT] Compactor: Run sanity check on blocks storage configuration at startup. #2143
* [ENHANCEMENT] Compactor: Add HTTP API for uploading TSDB blocks. Enabled with `-compactor.block-upload-enabled`. #1694 #2126
* [ENHANCEMENT] Distributor: Auto-forget unhealthy distributors after ten failed ring heartbeats. #2154
* [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883
* [BUGFIX] Ingester: fixed deceiving error log "failed to update cached shipped blocks after shipper initialisation", occurring for each new tenant in the ingester. #1893
* [BUGFIX] Ring: fix bug where instances may appear unhealthy in the hash ring web UI even though they are not. #1933
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ multitenancy_enabled: false
distributor:
pool:
health_check_ingesters: true
ring:
kvstore:
store: consul
consul:
host: consul:8500

ingester_client:
grpc_client_config:
Expand Down
2 changes: 1 addition & 1 deletion development/tsdb-blocks-storage-s3/compose-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ cd $SCRIPT_DIR && make
# -gcflags "all=-N -l" disables optimizations that allow for better run with combination with Delve debugger.
# GOARCH is not changed.
CGO_ENABLED=0 GOOS=linux go build -mod=vendor -gcflags "all=-N -l" -o ${SCRIPT_DIR}/mimir ${SCRIPT_DIR}/../../cmd/mimir
docker-compose -f ${SCRIPT_DIR}/docker-compose.yml build distributor
docker-compose -f ${SCRIPT_DIR}/docker-compose.yml build distributor-1
docker-compose -f ${SCRIPT_DIR}/docker-compose.yml up $@
4 changes: 2 additions & 2 deletions development/tsdb-blocks-storage-s3/config/grafana-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ prometheus:
scrape_configs:
- job_name: tsdb-blocks-storage-s3/distributor
static_configs:
- targets: ['distributor:8001']
- targets: ['distributor-1:8000', 'distributor-2:8001']
labels:
cluster: 'docker-compose'
namespace: 'tsdb-blocks-storage-s3'
Expand Down Expand Up @@ -61,4 +61,4 @@ prometheus:
namespace: 'tsdb-blocks-storage-s3'

remote_write:
- url: http://distributor:8001/api/v1/push
- url: http://distributor-1:8000/api/v1/push
1 change: 0 additions & 1 deletion development/tsdb-blocks-storage-s3/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ distributor:
pool:
health_check_ingesters: true
ring:
instance_addr: 127.0.0.1
kvstore:
store: consul
consul:
Expand Down
4 changes: 2 additions & 2 deletions development/tsdb-blocks-storage-s3/config/prometheus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ global:
scrape_configs:
- job_name: tsdb-blocks-storage-s3/distributor
static_configs:
- targets: ['distributor:8001']
- targets: ['distributor-1:8000', 'distributor-2:8001']
labels:
cluster: 'docker-compose'
namespace: 'tsdb-blocks-storage-s3'
Expand Down Expand Up @@ -54,5 +54,5 @@ scrape_configs:
namespace: 'tsdb-blocks-storage-s3'

remote_write:
- url: http://distributor:8001/api/v1/push
- url: http://distributor-1:8000/api/v1/push
send_exemplars: true
7 changes: 6 additions & 1 deletion development/tsdb-blocks-storage-s3/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ std.manifestYamlDoc({
{},

distributor:: {
distributor: mimirService({
'distributor-1': mimirService({
target: 'distributor',
httpPort: 8000,
}),

'distributor-2': mimirService({
target: 'distributor',
httpPort: 8001,
}),
Expand Down
25 changes: 24 additions & 1 deletion development/tsdb-blocks-storage-s3/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,30 @@
"image": "consul"
"ports":
- "8500:8500"
"distributor":
"distributor-1":
"build":
"context": "."
"dockerfile": "dev.dockerfile"
"command":
- "sh"
- "-c"
- "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=distributor -server.http-listen-port=8000 -server.grpc-listen-port=9000 -activity-tracker.filepath=/activity/distributor-8000 "
"depends_on":
- "minio"
- "consul"
"environment":
- "JAEGER_AGENT_HOST=jaeger"
- "JAEGER_AGENT_PORT=6831"
- "JAEGER_SAMPLER_PARAM=1"
- "JAEGER_SAMPLER_TYPE=const"
- "JAEGER_TAGS=app=distributor"
"image": "mimir"
"ports":
- "8000:8000"
"volumes":
- "./config:/mimir/config"
- "./activity:/activity"
"distributor-2":
"build":
"context": "."
"dockerfile": "dev.dockerfile"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ multitenancy_enabled: false
distributor:
pool:
health_check_ingesters: true
ring:
kvstore:
store: consul
consul:
host: consul:8500

ingester_client:
grpc_client_config:
Expand Down
125 changes: 82 additions & 43 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/limiter"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
Expand Down Expand Up @@ -65,8 +66,12 @@ var (
)

const (
// DistributorRingKey is the key under which we store the distributors ring in the KVStore.
DistributorRingKey = "distributor"
// distributorRingKey is the key under which we store the distributors ring in the KVStore.
distributorRingKey = "distributor"

// ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
// in the ring will be automatically removed.
ringAutoForgetUnhealthyPeriods = 10
)

const (
Expand All @@ -87,8 +92,9 @@ type Distributor struct {

// The global rate limiter requires a distributors ring to count
// the number of healthy instances
distributorsLifeCycler *ring.Lifecycler
distributorsLifeCycler *ring.BasicLifecycler
distributorsRing *ring.Ring
healthyInstancesCount *atomic.Uint32

// For handling HA replicas.
HATracker *haTracker
Expand Down Expand Up @@ -206,44 +212,16 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
subservices := []services.Service(nil)
subservices = append(subservices, haTracker)

// Create the configured ingestion rate limit strategy (local or global). In case
// it's an internal dependency and can't join the distributors ring, we skip rate
// limiting.
var ingestionRateStrategy, requestRateStrategy limiter.RateLimiterStrategy
var distributorsLifeCycler *ring.Lifecycler
var distributorsRing *ring.Ring

if !canJoinDistributorsRing {
requestRateStrategy = newInfiniteRateStrategy()
ingestionRateStrategy = newInfiniteRateStrategy()
} else {
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", DistributorRingKey, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, err
}

distributorsRing, err = ring.New(cfg.DistributorRing.ToRingConfig(), "distributor", DistributorRingKey, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, errors.Wrap(err, "failed to initialize distributors' ring client")
}
subservices = append(subservices, distributorsLifeCycler, distributorsRing)

requestRateStrategy = newGlobalRateStrategy(newRequestRateStrategy(limits), distributorsLifeCycler)
ingestionRateStrategy = newGlobalRateStrategy(newIngestionRateStrategy(limits), distributorsLifeCycler)
}

d := &Distributor{
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
distributorsLifeCycler: distributorsLifeCycler,
distributorsRing: distributorsRing,
limits: limits,
requestRateLimiter: limiter.NewRateLimiter(requestRateStrategy, 10*time.Second),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
healthyInstancesCount: atomic.NewUint32(0),
limits: limits,
forwarder: forwarding.NewForwarder(reg, cfg.Forwarding),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),

queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Expand Down Expand Up @@ -351,7 +329,46 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
return d.ingestionRate.Rate()
})

d.forwarder = forwarding.NewForwarder(reg, d.cfg.Forwarding)
// Create the configured ingestion rate limit strategy (local or global). In case
// it's an internal dependency and can't join the distributors ring, we skip rate
// limiting.
var ingestionRateStrategy, requestRateStrategy limiter.RateLimiterStrategy
var distributorsLifeCycler *ring.BasicLifecycler
var distributorsRing *ring.Ring

if !canJoinDistributorsRing {
requestRateStrategy = newInfiniteRateStrategy()
ingestionRateStrategy = newInfiniteRateStrategy()
} else {
kvStore, err := kv.NewClient(cfg.DistributorRing.KVStore, ring.GetCodec(), kv.RegistererWithKVName(reg, "distributor-lifecycler"), log)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize distributors' KV store")
}

delegate := ring.BasicLifecyclerDelegate(ring.NewInstanceRegisterDelegate(ring.ACTIVE, ringNumTokens))
delegate = newHealthyInstanceDelegate(d.healthyInstancesCount, delegate)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.DistributorRing.HeartbeatTimeout, delegate, log)

distributorsLifeCycler, err = ring.NewBasicLifecycler(cfg.DistributorRing.ToBasicLifecyclerConfig(), "distributor", distributorRingKey, kvStore, delegate, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, errors.Wrap(err, "failed to initialize distributors' lifecycler")
}

distributorsRing, err = ring.New(cfg.DistributorRing.ToRingConfig(), "distributor", distributorRingKey, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, errors.Wrap(err, "failed to initialize distributors' ring client")
}
subservices = append(subservices, distributorsLifeCycler, distributorsRing)

requestRateStrategy = newGlobalRateStrategy(newRequestRateStrategy(limits), d)
ingestionRateStrategy = newGlobalRateStrategy(newIngestionRateStrategy(limits), d)
}

d.requestRateLimiter = limiter.NewRateLimiter(requestRateStrategy, 10*time.Second)
d.ingestionRateLimiter = limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second)
d.distributorsLifeCycler = distributorsLifeCycler
d.distributorsRing = distributorsRing

d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
d.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(d.cleanupInactiveUser)
Expand All @@ -361,6 +378,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
if err != nil {
return nil, err
}

d.subservicesWatcher = services.NewFailureWatcher()
d.subservicesWatcher.WatchManager(d.subservices)

Expand All @@ -369,8 +387,20 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
}

func (d *Distributor) starting(ctx context.Context) error {
// Only report success if all sub-services start properly
return services.StartManagerAndAwaitHealthy(ctx, d.subservices)
if err := services.StartManagerAndAwaitHealthy(ctx, d.subservices); err != nil {
return errors.Wrap(err, "unable to start distributor subservices")
}

// Distributors get embedded in rulers and queriers to talk to ingesters on the query path. In that
// case they won't have a distributor lifecycler or ring so don't try to join the distributor ring.
if d.distributorsLifeCycler != nil && d.distributorsRing != nil {
level.Info(d.log).Log("msg", "waiting until distributor is ACTIVE in the ring")
if err := ring.WaitInstanceState(ctx, d.distributorsRing, d.distributorsLifeCycler.GetInstanceID(), ring.ACTIVE); err != nil {
return err
}
}

return nil
}

func (d *Distributor) running(ctx context.Context) error {
Expand Down Expand Up @@ -1420,3 +1450,12 @@ func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request) {
util.WriteHTMLResponse(w, ringNotEnabledPage)
}
}

// HealthyInstancesCount implements the ReadLifecycler interface
//
// We use a ring lifecycler delegate to count the number of members of the
// ring. The count is then used to enforce rate limiting correctly for each
// distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES
func (d *Distributor) HealthyInstancesCount() int {
return int(d.healthyInstancesCount.Load())
}
52 changes: 17 additions & 35 deletions pkg/distributor/distributor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ import (
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/netutil"
"github.com/grafana/dskit/ring"
)

util_log "github.com/grafana/mimir/pkg/util/log"
const (
// ringNumTokens is how many tokens each distributor should have in the ring.
// Distributors use a ring because they need to know how many distributors there
// are in total for rate limiting.
ringNumTokens = 1
)

// RingConfig masks the ring lifecycler config which contains
Expand All @@ -43,7 +48,7 @@ type RingConfig struct {
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
hostname, err := os.Hostname()
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to get hostname", "err", err)
level.Error(logger).Log("msg", "failed to get hostname", "err", err)
os.Exit(1)
}

Expand All @@ -61,39 +66,16 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.StringVar(&cfg.InstanceID, "distributor.ring.instance-id", hostname, "Instance ID to register in the ring.")
}

// ToLifecyclerConfig returns a LifecyclerConfig based on the distributor
// ring config.
func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig {
// We have to make sure that the ring.LifecyclerConfig and ring.Config
// defaults are preserved
lc := ring.LifecyclerConfig{}
rc := ring.Config{}

flagext.DefaultValues(&lc)
flagext.DefaultValues(&rc)

// Configure ring
rc.KVStore = cfg.KVStore
rc.HeartbeatTimeout = cfg.HeartbeatTimeout
rc.ReplicationFactor = 1

// Configure lifecycler
lc.RingConfig = rc
lc.ListenPort = cfg.ListenPort
lc.Addr = cfg.InstanceAddr
lc.Port = cfg.InstancePort
lc.ID = cfg.InstanceID
lc.InfNames = cfg.InstanceInterfaceNames
lc.UnregisterOnShutdown = true
lc.HeartbeatPeriod = cfg.HeartbeatPeriod
lc.HeartbeatTimeout = cfg.HeartbeatTimeout
lc.ObservePeriod = 0
lc.NumTokens = 1
lc.JoinAfter = 0
lc.MinReadyDuration = 0
lc.FinalSleep = 0

return lc
func (cfg *RingConfig) ToBasicLifecyclerConfig() ring.BasicLifecyclerConfig {
return ring.BasicLifecyclerConfig{
ID: cfg.InstanceID,
Addr: cfg.InstanceAddr,
HeartbeatPeriod: cfg.HeartbeatPeriod,
HeartbeatTimeout: cfg.HeartbeatTimeout,
TokensObservePeriod: 0,
NumTokens: ringNumTokens,
KeepInstanceInTheRingOnShutdown: false,
}
}

func (cfg *RingConfig) ToRingConfig() ring.Config {
Expand Down
Loading

0 comments on commit 83c065d

Please sign in to comment.