-
Notifications
You must be signed in to change notification settings - Fork 457
/
gateway_ring.go
165 lines (137 loc) · 8.86 KB
/
gateway_ring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/storegateway/gateway_ring.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Cortex Authors.
package storegateway
import (
"flag"
"fmt"
"net"
"os"
"strconv"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/netutil"
"github.com/grafana/dskit/ring"
)
const (
// RingKey is the key under which we store the store gateways ring in the KVStore.
RingKey = "store-gateway"
// RingNameForServer is the name of the ring used by the store gateway server.
RingNameForServer = "store-gateway"
// RingNameForClient is the name of the ring used by the store gateway client (we need
// a different name to avoid clashing Prometheus metrics when running in single-binary).
RingNameForClient = "store-gateway-client"
// sharedOptionWithRingClient is a message appended to all config options that should be also
// set on the components running the store-gateway ring client.
sharedOptionWithRingClient = " This option needs be set both on the store-gateway, querier and ruler when running in microservices mode."
ringFlagsPrefix = "store-gateway.sharding-ring."
ringHeartbeatTimeoutFlag = ringFlagsPrefix + "heartbeat-timeout"
)
var (
// BlocksOwnerSync is the operation used to check the authoritative owners of a block
// (replicas included).
BlocksOwnerSync = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)
// BlocksOwnerRead is the operation used to check the authoritative owners of a block
// (replicas included) that are available for queries (a store-gateway is available for
// queries only when ACTIVE).
BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
// BlocksRead is the operation run by the querier to query blocks via the store-gateway.
BlocksRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool {
// Blocks can only be queried from ACTIVE instances. However, if the block belongs to
// a non-active instance, then we should extend the replication set and try to query it
// from the next ACTIVE instance in the ring (which is expected to have it because a
// store-gateway keeps their previously owned blocks until new owners are ACTIVE).
return s != ring.ACTIVE
})
)
// RingConfig masks the ring lifecycler config which contains
// many options not really required by the store gateways ring. This config
// is used to strip down the config to the minimum, and avoid confusion
// to the user.
type RingConfig struct {
KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances. This option needs be set both on the store-gateway, querier and ruler when running in microservices mode."`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period" category:"advanced"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout" category:"advanced"`
ReplicationFactor int `yaml:"replication_factor" category:"advanced"`
TokensFilePath string `yaml:"tokens_file_path"`
NumTokens int `yaml:"num_tokens" category:"advanced"`
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
AutoForgetEnabled bool `yaml:"auto_forget_enabled"`
// Wait ring stability.
WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration" category:"advanced"`
WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration" category:"advanced"`
// Instance details
InstanceID string `yaml:"instance_id" doc:"default=<hostname>" category:"advanced"`
InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"`
InstancePort int `yaml:"instance_port" category:"advanced"`
InstanceAddr string `yaml:"instance_addr" category:"advanced"`
EnableIPv6 bool `yaml:"instance_enable_ipv6" category:"advanced"`
InstanceZone string `yaml:"instance_availability_zone"`
UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"`
// Injected internally
ListenPort int `yaml:"-"`
RingCheckPeriod time.Duration `yaml:"-"`
}
// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
hostname, err := os.Hostname()
if err != nil {
level.Error(logger).Log("msg", "failed to get hostname", "err", err)
os.Exit(1)
}
// Ring flags
cfg.KVStore.Store = "memberlist"
cfg.KVStore.RegisterFlagsWithPrefix(ringFlagsPrefix, "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, ringFlagsPrefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.")
f.DurationVar(&cfg.HeartbeatTimeout, ringHeartbeatTimeoutFlag, time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring. 0 = never (timeout disabled)."+sharedOptionWithRingClient)
f.IntVar(&cfg.ReplicationFactor, ringFlagsPrefix+"replication-factor", 3, "The replication factor to use when sharding blocks."+sharedOptionWithRingClient)
f.StringVar(&cfg.TokensFilePath, ringFlagsPrefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, ringFlagsPrefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones."+sharedOptionWithRingClient)
f.IntVar(&cfg.NumTokens, ringFlagsPrefix+"num-tokens", ringNumTokensDefault, "Number of tokens for each store-gateway.")
f.BoolVar(&cfg.AutoForgetEnabled, ringFlagsPrefix+"auto-forget-enabled", true, fmt.Sprintf("When enabled, a store-gateway is automatically removed from the ring after failing to heartbeat the ring for a period longer than %d times the configured -%s.", ringAutoForgetUnhealthyPeriods, ringHeartbeatTimeoutFlag))
// Wait stability flags.
f.DurationVar(&cfg.WaitStabilityMinDuration, ringFlagsPrefix+"wait-stability-min-duration", 0, "Minimum time to wait for ring stability at startup, if set to positive value.")
f.DurationVar(&cfg.WaitStabilityMaxDuration, ringFlagsPrefix+"wait-stability-max-duration", 5*time.Minute, "Maximum time to wait for ring stability at startup. If the store-gateway ring keeps changing after this period of time, the store-gateway will start anyway.")
// Instance flags
cfg.InstanceInterfaceNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, logger)
f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), ringFlagsPrefix+"instance-interface-names", "List of network interface names to look up when finding the instance IP address.")
f.StringVar(&cfg.InstanceAddr, ringFlagsPrefix+"instance-addr", "", "IP address to advertise in the ring. Default is auto-detected.")
f.IntVar(&cfg.InstancePort, ringFlagsPrefix+"instance-port", 0, "Port to advertise in the ring (defaults to -server.grpc-listen-port).")
f.StringVar(&cfg.InstanceID, ringFlagsPrefix+"instance-id", hostname, "Instance ID to register in the ring.")
f.BoolVar(&cfg.EnableIPv6, ringFlagsPrefix+"instance-enable-ipv6", false, "Enable using a IPv6 instance address. (default false)")
f.StringVar(&cfg.InstanceZone, ringFlagsPrefix+"instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.")
f.BoolVar(&cfg.UnregisterOnShutdown, ringFlagsPrefix+"unregister-on-shutdown", true, "Unregister from the ring upon clean shutdown.")
// Defaults for internal settings.
cfg.RingCheckPeriod = 5 * time.Second
}
func (cfg *RingConfig) ToRingConfig() ring.Config {
rc := ring.Config{}
flagext.DefaultValues(&rc)
rc.KVStore = cfg.KVStore
rc.HeartbeatTimeout = cfg.HeartbeatTimeout
rc.ReplicationFactor = cfg.ReplicationFactor
rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled
rc.SubringCacheDisabled = true
return rc
}
func (cfg *RingConfig) ToLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error) {
instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, logger, cfg.EnableIPv6)
if err != nil {
return ring.BasicLifecyclerConfig{}, err
}
instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort)
return ring.BasicLifecyclerConfig{
ID: cfg.InstanceID,
Addr: net.JoinHostPort(instanceAddr, strconv.Itoa(instancePort)),
Zone: cfg.InstanceZone,
HeartbeatPeriod: cfg.HeartbeatPeriod,
HeartbeatTimeout: cfg.HeartbeatTimeout,
TokensObservePeriod: 0,
NumTokens: cfg.NumTokens,
KeepInstanceInTheRingOnShutdown: !cfg.UnregisterOnShutdown,
}, nil
}