Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use BasicLifecycler for distributors and auto-forget #2154

Merged
merged 4 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* [ENHANCEMENT] Compactor: Run sanity check on blocks storage configuration at startup. #2143
* [ENHANCEMENT] Compactor: Add HTTP API for uploading TSDB blocks. Enabled with `-compactor.block-upload-enabled`. #1694 #2126
* [ENHANCEMENT] Ingester: Enable querying overlapping blocks by default. #2187
* [ENHANCEMENT] Distributor: Auto-forget unhealthy distributors after ten failed ring heartbeats. #2154
* [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883
* [BUGFIX] Ingester: fixed deceiving error log "failed to update cached shipped blocks after shipper initialisation", occurring for each new tenant in the ingester. #1893
* [BUGFIX] Ring: fix bug where instances may appear unhealthy in the hash ring web UI even though they are not. #1933
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ multitenancy_enabled: false
distributor:
pool:
health_check_ingesters: true
ring:
kvstore:
store: consul
consul:
host: consul:8500

ingester_client:
grpc_client_config:
Expand Down
2 changes: 1 addition & 1 deletion development/tsdb-blocks-storage-s3/compose-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ cd $SCRIPT_DIR && make
# -gcflags "all=-N -l" disables optimizations that allow for better run with combination with Delve debugger.
# GOARCH is not changed.
CGO_ENABLED=0 GOOS=linux go build -mod=vendor -gcflags "all=-N -l" -o ${SCRIPT_DIR}/mimir ${SCRIPT_DIR}/../../cmd/mimir
docker-compose -f ${SCRIPT_DIR}/docker-compose.yml build distributor
docker-compose -f ${SCRIPT_DIR}/docker-compose.yml build distributor-1
docker-compose -f ${SCRIPT_DIR}/docker-compose.yml up $@
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ prometheus:
scrape_configs:
- job_name: tsdb-blocks-storage-s3/distributor
static_configs:
- targets: ['distributor:8001']
- targets: ['distributor-1:8000', 'distributor-2:8001']
labels:
cluster: 'docker-compose'
namespace: 'tsdb-blocks-storage-s3'
Expand Down Expand Up @@ -61,4 +61,4 @@ prometheus:
namespace: 'tsdb-blocks-storage-s3'

remote_write:
- url: http://distributor:8001/api/v1/push
- url: http://distributor-1:8000/api/v1/push
1 change: 0 additions & 1 deletion development/tsdb-blocks-storage-s3/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ distributor:
pool:
health_check_ingesters: true
ring:
instance_addr: 127.0.0.1
kvstore:
store: consul
consul:
Expand Down
4 changes: 2 additions & 2 deletions development/tsdb-blocks-storage-s3/config/prometheus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ global:
scrape_configs:
- job_name: tsdb-blocks-storage-s3/distributor
static_configs:
- targets: ['distributor:8001']
- targets: ['distributor-1:8000', 'distributor-2:8001']
labels:
cluster: 'docker-compose'
namespace: 'tsdb-blocks-storage-s3'
Expand Down Expand Up @@ -54,5 +54,5 @@ scrape_configs:
namespace: 'tsdb-blocks-storage-s3'

remote_write:
- url: http://distributor:8001/api/v1/push
- url: http://distributor-1:8000/api/v1/push
send_exemplars: true
7 changes: 6 additions & 1 deletion development/tsdb-blocks-storage-s3/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ std.manifestYamlDoc({
{},

distributor:: {
distributor: mimirService({
'distributor-1': mimirService({
target: 'distributor',
httpPort: 8000,
}),

'distributor-2': mimirService({
target: 'distributor',
httpPort: 8001,
}),
Expand Down
25 changes: 24 additions & 1 deletion development/tsdb-blocks-storage-s3/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,30 @@
"image": "consul"
"ports":
- "8500:8500"
"distributor":
"distributor-1":
"build":
"context": "."
"dockerfile": "dev.dockerfile"
"command":
- "sh"
- "-c"
- "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=distributor -server.http-listen-port=8000 -server.grpc-listen-port=9000 -activity-tracker.filepath=/activity/distributor-8000 "
"depends_on":
- "minio"
- "consul"
"environment":
- "JAEGER_AGENT_HOST=jaeger"
- "JAEGER_AGENT_PORT=6831"
- "JAEGER_SAMPLER_PARAM=1"
- "JAEGER_SAMPLER_TYPE=const"
- "JAEGER_TAGS=app=distributor"
"image": "mimir"
"ports":
- "8000:8000"
"volumes":
- "./config:/mimir/config"
- "./activity:/activity"
"distributor-2":
"build":
"context": "."
"dockerfile": "dev.dockerfile"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ multitenancy_enabled: false
distributor:
pool:
health_check_ingesters: true
ring:
kvstore:
store: consul
consul:
host: consul:8500

ingester_client:
grpc_client_config:
Expand Down
127 changes: 91 additions & 36 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/limiter"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
Expand Down Expand Up @@ -65,8 +66,12 @@ var (
)

const (
// DistributorRingKey is the key under which we store the distributors ring in the KVStore.
DistributorRingKey = "distributor"
// distributorRingKey is the key under which we store the distributors ring in the KVStore.
distributorRingKey = "distributor"

// ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
// in the ring will be automatically removed after.
ringAutoForgetUnhealthyPeriods = 10
Copy link
Collaborator

@pracucci pracucci Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very high. I would be more aggressive with distributors, like 2 should be enough.

)

const (
Expand All @@ -87,8 +92,9 @@ type Distributor struct {

// The global rate limiter requires a distributors ring to count
// the number of healthy instances
distributorsLifeCycler *ring.Lifecycler
distributorsLifecycler *ring.BasicLifecycler
distributorsRing *ring.Ring
healthyInstancesCount *atomic.Uint32

// For handling HA replicas.
HATracker *haTracker
Expand Down Expand Up @@ -206,42 +212,14 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
subservices := []services.Service(nil)
subservices = append(subservices, haTracker)

// Create the configured ingestion rate limit strategy (local or global). In case
// it's an internal dependency and can't join the distributors ring, we skip rate
// limiting.
var ingestionRateStrategy, requestRateStrategy limiter.RateLimiterStrategy
var distributorsLifeCycler *ring.Lifecycler
var distributorsRing *ring.Ring

if !canJoinDistributorsRing {
requestRateStrategy = newInfiniteRateStrategy()
ingestionRateStrategy = newInfiniteRateStrategy()
} else {
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", DistributorRingKey, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, err
}

distributorsRing, err = ring.New(cfg.DistributorRing.ToRingConfig(), "distributor", DistributorRingKey, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, errors.Wrap(err, "failed to initialize distributors' ring client")
}
subservices = append(subservices, distributorsLifeCycler, distributorsRing)

requestRateStrategy = newGlobalRateStrategy(newRequestRateStrategy(limits), distributorsLifeCycler)
ingestionRateStrategy = newGlobalRateStrategy(newIngestionRateStrategy(limits), distributorsLifeCycler)
}

d := &Distributor{
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
distributorsLifeCycler: distributorsLifeCycler,
distributorsRing: distributorsRing,
healthyInstancesCount: atomic.NewUint32(0),
limits: limits,
requestRateLimiter: limiter.NewRateLimiter(requestRateStrategy, 10*time.Second),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
forwarder: forwarding.NewForwarder(reg, cfg.Forwarding),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),

Expand Down Expand Up @@ -351,7 +329,31 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
return d.ingestionRate.Rate()
})

d.forwarder = forwarding.NewForwarder(reg, d.cfg.Forwarding)
// Create the configured ingestion rate limit strategy (local or global). In case
// it's an internal dependency and we can't join the distributors ring, we skip rate
// limiting.
var ingestionRateStrategy, requestRateStrategy limiter.RateLimiterStrategy
var distributorsLifecycler *ring.BasicLifecycler
var distributorsRing *ring.Ring

if !canJoinDistributorsRing {
requestRateStrategy = newInfiniteRateStrategy()
ingestionRateStrategy = newInfiniteRateStrategy()
} else {
distributorsRing, distributorsLifecycler, err = newRingAndLifecycler(cfg.DistributorRing, d.healthyInstancesCount, log, reg)
if err != nil {
return nil, err
}

subservices = append(subservices, distributorsLifecycler, distributorsRing)
requestRateStrategy = newGlobalRateStrategy(newRequestRateStrategy(limits), d)
ingestionRateStrategy = newGlobalRateStrategy(newIngestionRateStrategy(limits), d)
}

d.requestRateLimiter = limiter.NewRateLimiter(requestRateStrategy, 10*time.Second)
d.ingestionRateLimiter = limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second)
d.distributorsLifecycler = distributorsLifecycler
d.distributorsRing = distributorsRing

d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
d.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(d.cleanupInactiveUser)
Expand All @@ -361,16 +363,60 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
if err != nil {
return nil, err
}

d.subservicesWatcher = services.NewFailureWatcher()
d.subservicesWatcher.WatchManager(d.subservices)

d.Service = services.NewBasicService(d.starting, d.running, d.stopping)
return d, nil
}

// newRingAndLifecycler creates a new distributor ring and lifecycler with all required lifecycler delegates
func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger log.Logger, reg prometheus.Registerer) (*ring.Ring, *ring.BasicLifecycler, error) {
kvStore, err := kv.NewClient(cfg.KVStore, ring.GetCodec(), kv.RegistererWithKVName(reg, "distributor-lifecycler"), logger)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to initialize distributors' KV store")
}

lifecyclerCfg, err := cfg.ToBasicLifecyclerConfig(logger)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to build distributors' lifecycler config")
}

var delegate ring.BasicLifecyclerDelegate
delegate = ring.NewInstanceRegisterDelegate(ring.ACTIVE, ringNumTokens)
delegate = newHealthyInstanceDelegate(instanceCount, cfg.HeartbeatTimeout, delegate)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, logger)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.HeartbeatTimeout, delegate, logger)

distributorsLifecycler, err := ring.NewBasicLifecycler(lifecyclerCfg, "distributor", distributorRingKey, kvStore, delegate, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, nil, errors.Wrap(err, "failed to initialize distributors' lifecycler")
}

distributorsRing, err := ring.New(cfg.ToRingConfig(), "distributor", distributorRingKey, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, nil, errors.Wrap(err, "failed to initialize distributors' ring client")
}

return distributorsRing, distributorsLifecycler, nil
}

func (d *Distributor) starting(ctx context.Context) error {
// Only report success if all sub-services start properly
return services.StartManagerAndAwaitHealthy(ctx, d.subservices)
if err := services.StartManagerAndAwaitHealthy(ctx, d.subservices); err != nil {
return errors.Wrap(err, "unable to start distributor subservices")
}

// Distributors get embedded in rulers and queriers to talk to ingesters on the query path. In that
// case they won't have a distributor lifecycler or ring so don't try to join the distributor ring.
if d.distributorsLifecycler != nil && d.distributorsRing != nil {
level.Info(d.log).Log("msg", "waiting until distributor is ACTIVE in the ring")
if err := ring.WaitInstanceState(ctx, d.distributorsRing, d.distributorsLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
return err
}
}

return nil
}

func (d *Distributor) running(ctx context.Context) error {
Expand Down Expand Up @@ -1420,3 +1466,12 @@ func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request) {
util.WriteHTMLResponse(w, ringNotEnabledPage)
}
}

// HealthyInstancesCount implements the ReadLifecycler interface
//
// We use a ring lifecycler delegate to count the number of members of the
// ring. The count is then used to enforce rate limiting correctly for each
// distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES
func (d *Distributor) HealthyInstancesCount() int {
return int(d.healthyInstancesCount.Load())
}
58 changes: 24 additions & 34 deletions pkg/distributor/distributor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package distributor

import (
"flag"
"fmt"
"os"
"time"

Expand All @@ -16,8 +17,13 @@ import (
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/netutil"
"github.com/grafana/dskit/ring"
)

util_log "github.com/grafana/mimir/pkg/util/log"
const (
// ringNumTokens is how many tokens each distributor should have in the ring.
// Distributors use a ring because they need to know how many distributors there
// are in total for rate limiting.
ringNumTokens = 1
colega marked this conversation as resolved.
Show resolved Hide resolved
)

// RingConfig masks the ring lifecycler config which contains
Expand All @@ -43,7 +49,7 @@ type RingConfig struct {
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
hostname, err := os.Hostname()
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to get hostname", "err", err)
level.Error(logger).Log("msg", "failed to get hostname", "err", err)
os.Exit(1)
}

Expand All @@ -61,39 +67,23 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.StringVar(&cfg.InstanceID, "distributor.ring.instance-id", hostname, "Instance ID to register in the ring.")
}

// ToLifecyclerConfig returns a LifecyclerConfig based on the distributor
// ring config.
func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig {
// We have to make sure that the ring.LifecyclerConfig and ring.Config
// defaults are preserved
lc := ring.LifecyclerConfig{}
rc := ring.Config{}

flagext.DefaultValues(&lc)
flagext.DefaultValues(&rc)

// Configure ring
rc.KVStore = cfg.KVStore
rc.HeartbeatTimeout = cfg.HeartbeatTimeout
rc.ReplicationFactor = 1
func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error) {
instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, logger)
if err != nil {
return ring.BasicLifecyclerConfig{}, err
}

// Configure lifecycler
lc.RingConfig = rc
lc.ListenPort = cfg.ListenPort
lc.Addr = cfg.InstanceAddr
lc.Port = cfg.InstancePort
lc.ID = cfg.InstanceID
lc.InfNames = cfg.InstanceInterfaceNames
lc.UnregisterOnShutdown = true
lc.HeartbeatPeriod = cfg.HeartbeatPeriod
lc.HeartbeatTimeout = cfg.HeartbeatTimeout
lc.ObservePeriod = 0
lc.NumTokens = 1
lc.JoinAfter = 0
lc.MinReadyDuration = 0
lc.FinalSleep = 0

return lc
instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort)

return ring.BasicLifecyclerConfig{
ID: cfg.InstanceID,
Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort),
HeartbeatPeriod: cfg.HeartbeatPeriod,
HeartbeatTimeout: cfg.HeartbeatTimeout,
TokensObservePeriod: 0,
NumTokens: ringNumTokens,
KeepInstanceInTheRingOnShutdown: false,
}, nil
}

func (cfg *RingConfig) ToRingConfig() ring.Config {
Expand Down
Loading