Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting ring heartbeat timeout to zero to disable timeout check. #4342

Merged
merged 2 commits into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341
* [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342
* `-distributor.ring.heartbeat-timeout`
* `-ring.heartbeat-timeout`
* `-ruler.ring.heartbeat-timeout`
* `-alertmanager.sharding-ring.heartbeat-timeout`
* `-compactor.ring.heartbeat-timeout`
* `-store-gateway.sharding-ring.heartbeat-timeout`
stevesg marked this conversation as resolved.
Show resolved Hide resolved
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336

## 1.10.0-rc.0 / 2021-06-28
Expand Down
2 changes: 1 addition & 1 deletion docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ compactor:
[heartbeat_period: <duration> | default = 5s]

# The heartbeat timeout after which compactors are considered unhealthy
# within the ring.
# within the ring. 0 = never (timeout disabled).
# CLI flag: -compactor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down
4 changes: 2 additions & 2 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ store_gateway:
[heartbeat_period: <duration> | default = 15s]

# The heartbeat timeout after which store gateways are considered unhealthy
# within the ring. This option needs be set both on the store-gateway and
# querier when running in microservices mode.
# within the ring. 0 = never (timeout disabled). This option needs be set
# both on the store-gateway and querier when running in microservices mode.
# CLI flag: -store-gateway.sharding-ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down
13 changes: 7 additions & 6 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ ring:
[heartbeat_period: <duration> | default = 5s]

# The heartbeat timeout after which distributors are considered unhealthy
# within the ring.
# within the ring. 0 = never (timeout disabled).
# CLI flag: -distributor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down Expand Up @@ -662,6 +662,7 @@ lifecycler:
[mirror_timeout: <duration> | default = 2s]

# The heartbeat timeout after which ingesters are skipped for reads/writes.
# 0 = never (timeout disabled).
# CLI flag: -ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down Expand Up @@ -1585,7 +1586,7 @@ ring:
[heartbeat_period: <duration> | default = 5s]

# The heartbeat timeout after which rulers are considered unhealthy within the
# ring.
# ring. 0 = never (timeout disabled).
# CLI flag: -ruler.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down Expand Up @@ -1906,7 +1907,7 @@ sharding_ring:
[heartbeat_period: <duration> | default = 15s]

# The heartbeat timeout after which alertmanagers are considered unhealthy
# within the ring.
# within the ring. 0 = never (timeout disabled).
# CLI flag: -alertmanager.sharding-ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down Expand Up @@ -5179,7 +5180,7 @@ sharding_ring:
[heartbeat_period: <duration> | default = 5s]

# The heartbeat timeout after which compactors are considered unhealthy within
# the ring.
# the ring. 0 = never (timeout disabled).
# CLI flag: -compactor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down Expand Up @@ -5257,8 +5258,8 @@ sharding_ring:
[heartbeat_period: <duration> | default = 15s]

# The heartbeat timeout after which store gateways are considered unhealthy
# within the ring. This option needs be set both on the store-gateway and
# querier when running in microservices mode.
# within the ring. 0 = never (timeout disabled). This option needs be set both
# on the store-gateway and querier when running in microservices mode.
# CLI flag: -store-gateway.sharding-ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/alertmanager_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix(rfprefix, "alertmanagers/", f)
f.DurationVar(&cfg.HeartbeatPeriod, rfprefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, rfprefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which alertmanagers are considered unhealthy within the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, rfprefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which alertmanagers are considered unhealthy within the ring. 0 = never (timeout disabled).")
f.IntVar(&cfg.ReplicationFactor, rfprefix+"replication-factor", 3, "The replication factor to use when sharding the alertmanager.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, rfprefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate alerts across different availability zones.")

Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("compactor.ring.", "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "compactor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring. 0 = never (timeout disabled).")

// Wait stability flags.
f.DurationVar(&cfg.WaitStabilityMinDuration, "compactor.ring.wait-stability-min-duration", time.Minute, "Minimum time to wait for ring stability at startup. 0 to disable.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("distributor.ring.", "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "distributor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "distributor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which distributors are considered unhealthy within the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "distributor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which distributors are considered unhealthy within the ring. 0 = never (timeout disabled).")

// Instance flags
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
Expand Down
13 changes: 11 additions & 2 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (d *Desc) FindIngestersByState(state InstanceState) []InstanceDesc {
func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error {
numTokens := 0
for id, ingester := range d.Ingesters {
if now.Sub(time.Unix(ingester.Timestamp, 0)) > heartbeatTimeout {
if !ingester.IsHeartbeatHealthy(heartbeatTimeout, now) {
return fmt.Errorf("instance %s past heartbeat timeout", id)
} else if ingester.State != ACTIVE {
return fmt.Errorf("instance %s in state %v", id, ingester.State)
Expand Down Expand Up @@ -136,7 +136,16 @@ func (i *InstanceDesc) GetRegisteredAt() time.Time {
func (i *InstanceDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, now time.Time) bool {
healthy := op.IsInstanceInStateHealthy(i.State)

return healthy && now.Unix()-i.Timestamp <= heartbeatTimeout.Milliseconds()/1000
return healthy && i.IsHeartbeatHealthy(heartbeatTimeout, now)
}

// IsHeartbeatHealthy returns whether the heartbeat timestamp for the ingester is within the
// specified timeout period. A timeout of zero disables the timeout; the heartbeat is ignored.
func (i *InstanceDesc) IsHeartbeatHealthy(heartbeatTimeout time.Duration, now time.Time) bool {
if heartbeatTimeout == 0 {
return true
}
return now.Sub(time.Unix(i.Timestamp, 0)) <= heartbeatTimeout
}

// Merge merges other ring into this one. Returns sub-ring that represents the change,
Expand Down
8 changes: 8 additions & 0 deletions pkg/ring/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,18 @@ func TestDesc_Ready(t *testing.T) {
t.Fatal("expected ready, got", err)
}

if err := r.Ready(now, 0); err != nil {
t.Fatal("expected ready, got", err)
}

if err := r.Ready(now.Add(5*time.Minute), 10*time.Second); err == nil {
t.Fatal("expected !ready (no heartbeat from active ingester), but got no error")
}

if err := r.Ready(now.Add(5*time.Minute), 0); err != nil {
t.Fatal("expected ready (no heartbeat but timeout disabled), got", err)
}

r = &Desc{
Ingesters: map[string]InstanceDesc{
"ing1": {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.KVStore.RegisterFlagsWithPrefix(prefix, "collectors/", f)

f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes. 0 = never (timeout disabled).")
f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.")
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,11 @@ func TestRing_GetAllHealthy(t *testing.T) {
}

func TestRing_GetReplicationSetForOperation(t *testing.T) {
const heartbeatTimeout = time.Minute
now := time.Now()

tests := map[string]struct {
ringInstances map[string]InstanceDesc
ringHeartbeatTimeout time.Duration
ringReplicationFactor int
expectedErrForRead error
expectedSetForRead []string
Expand All @@ -405,6 +405,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
}{
"should return error on empty ring": {
ringInstances: nil,
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 1,
expectedErrForRead: ErrEmptyRing,
expectedErrForWrite: ErrEmptyRing,
Expand All @@ -418,11 +419,41 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-40 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 1,
expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForWrite: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForReporting: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
},
"should succeed on instances with old timestamps but heartbeat timeout disabled": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-2": {Addr: "127.0.0.2", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-3": {Addr: "127.0.0.3", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: 0,
ringReplicationFactor: 1,
expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForWrite: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForReporting: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
},
"should succeed on instances with zero timestamp but heartbeat timeout disabled": {
stevesg marked this conversation as resolved.
Show resolved Hide resolved
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: 0, Tokens: GenerateTokens(128, nil)},
"instance-2": {Addr: "127.0.0.2", State: ACTIVE, Timestamp: 0, Tokens: GenerateTokens(128, nil)},
"instance-3": {Addr: "127.0.0.3", State: ACTIVE, Timestamp: 0, Tokens: GenerateTokens(128, nil)},
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: 0, Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: 0, Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: 0,
ringReplicationFactor: 1,
expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForWrite: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForReporting: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
},

"should fail on 1 unhealthy instance and RF=1": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: now.Unix(), Tokens: GenerateTokens(128, nil)},
Expand All @@ -431,6 +462,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 1,
expectedErrForRead: ErrTooManyUnhealthyInstances,
expectedErrForWrite: ErrTooManyUnhealthyInstances,
Expand All @@ -444,6 +476,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 3,
expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"},
expectedSetForWrite: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"},
Expand All @@ -457,6 +490,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 3,
expectedErrForRead: ErrTooManyUnhealthyInstances,
expectedErrForWrite: ErrTooManyUnhealthyInstances,
Expand All @@ -474,7 +508,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {

ring := Ring{
cfg: Config{
HeartbeatTimeout: heartbeatTimeout,
HeartbeatTimeout: testData.ringHeartbeatTimeout,
ReplicationFactor: testData.ringReplicationFactor,
},
ringDesc: ringDesc,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/ruler_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("ruler.ring.", "rulers/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "ruler.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "ruler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which rulers are considered unhealthy within the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "ruler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which rulers are considered unhealthy within the ring. 0 = never (timeout disabled).")

// Instance flags
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix(ringFlagsPrefix, "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, ringFlagsPrefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, ringFlagsPrefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring."+sharedOptionWithQuerier)
f.DurationVar(&cfg.HeartbeatTimeout, ringFlagsPrefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring. 0 = never (timeout disabled)."+sharedOptionWithQuerier)
f.IntVar(&cfg.ReplicationFactor, ringFlagsPrefix+"replication-factor", 3, "The replication factor to use when sharding blocks."+sharedOptionWithQuerier)
f.StringVar(&cfg.TokensFilePath, ringFlagsPrefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, ringFlagsPrefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.")
Expand Down