Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Enabling availability zone awareness in metric R/W with ingesters. (#…
…2317)

* adding Zone information to Token and Ingester description structs. Added distinct zone check in ring Get ingesters function

Signed-off-by: Ken Haines <khaines@microsoft.com>

* updating AddIngester to include zone

Signed-off-by: Ken Haines <khaines@microsoft.com>

* updating lifecylcer config and tests to enable availability zone

Signed-off-by: Ken Haines <khaines@microsoft.com>

* adding zone info to ring's HTTP method

Signed-off-by: Ken Haines <khaines@microsoft.com>

* adding positive & negative zone aware replica set tests

Signed-off-by: Ken Haines <khaines@microsoft.com>

* updating config ref doc

Signed-off-by: Ken Haines <khaines@microsoft.com>

* updating changelog

Signed-off-by: Ken Haines <khaines@microsoft.com>

* correcting misspell caught in linting

Signed-off-by: Ken Haines <khaines@microsoft.com>

* updating lifecycler config for availbility zone so that the docs are generated in a consistent/verifiable way but still have a decent default value

Signed-off-by: Ken Haines <khaines@microsoft.com>

* Adding some zone based replication docs

Signed-off-by: Ken Haines <khaines@microsoft.com>
  • Loading branch information
khaines committed Mar 29, 2020
1 parent 6684bff commit b3112b3
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -59,6 +59,7 @@
* `-flusher.wal-dir` for the WAL directory to recover from.
* `-flusher.concurrent-flushes` for number of concurrent flushes.
* `-flusher.flush-op-timeout` is duration after which a flush should timeout.
* [FEATURE] Ingesters can now have an optional availability zone set, to ensure metric replication is distributed across zones. This is set via the `-ingester.availability-zone` flag or the `availability_zone` field in the config file. #2317
* [ENHANCEMENT] Better re-use of connections to DynamoDB and S3. #2268
* [ENHANCEMENT] Experimental TSDB: Add support for local `filesystem` backend. #2245
* [ENHANCEMENT] Experimental TSDB: Added memcached support for the TSDB index cache. #2290
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Expand Up @@ -539,6 +539,11 @@ lifecycler:
# CLI flag: -ingester.tokens-file-path
[tokens_file_path: <string> | default = ""]

# The availability zone of the host, this instance is running on. Default is
# the lifecycler ID.
# CLI flag: -ingester.availability-zone
[availability_zone: <string> | default = ""]

# Number of times to try and transfer chunks before falling back to flushing.
# Negative value or zero disables hand-over.
# CLI flag: -ingester.max-transfer-retries
Expand Down
4 changes: 3 additions & 1 deletion docs/guides/running.md
Expand Up @@ -51,7 +51,9 @@ Memcached is not essential but highly recommended.
The standard replication factor is three, so that we can drop one
replica and be unconcerned, as we still have two copies of the data
left for redundancy. This is configurable: you can run with more
redundancy or less, depending on your risk appetite.
redundancy or less, depending on your risk appetite. By default
ingesters are not aware of availability zones. See [zone aware replication](zone-replication.md)
to change this.

### Schema

Expand Down
30 changes: 30 additions & 0 deletions docs/guides/zone-replication.md
@@ -0,0 +1,30 @@
---
title: "Ingester Hand-over"
linkTitle: "Ingester Hand-over"
weight: 5
slug: ingester-handover
---

In a default configuration, time-series written to ingesters are replicated based on the container/pod name of the ingester instances. It is completely possible that all the replicas for the given time-series are held with in the same availability zone, even if the cortex infrastructure spans multiple zones within the region. Storing multiple replicas for a given time-series poses a risk for data loss if there is an outage affecting various nodes within a zone or a total outage.

## Configuration

Cortex can be configured to consider an availability zone value in its replication system. Doing so mitigates risks associated with losing multiple nodes with in the same availability zone. The availability zone for an ingester can be defined on the command line of the ingester using the `ingester.availability-zone` flag or using the yaml configuration:

```yaml
ingester:
lifecycler:
availability_zone: "zone-3"
```

## Zone Replication Considerations

Enabling availability zone awareness helps mitigate risks regarding data loss within a single zone, some items need consideration by an operator if they are thinking of enabling this feature.

### Minimum number of Zones

For cortex to function correctly, there must be at least the same number of availability zones as there is replica count. So by default, a cortex cluster should be spread over 3 zones as the default replica count is 3. It is safe to have more zones than the replica count, but it cannot be less. Having fewer availability zones than replica count causes a replica write to be missed, and in some cases, the write fails if the availability zone count is too low.

### Cost

Depending on the existing cortex infrastructure being used, this may cause an increase in running costs as most cloud providers charge for cross availability zone traffic. The most significant change would be for a cortex cluster currently running in a singular zone.
11 changes: 7 additions & 4 deletions pkg/ring/http.go
Expand Up @@ -30,6 +30,7 @@ const pageContent = `
<thead>
<tr>
<th>Instance ID</th>
<th>Availability Zone</th>
<th>State</th>
<th>Address</th>
<th>Last Heartbeat</th>
Expand All @@ -46,6 +47,7 @@ const pageContent = `
<tr bgcolor="#BEBEBE">
{{ end }}
<td>{{ .ID }}</td>
<td>{{ .Zone }}</td>
<td>{{ .State }}</td>
<td>{{ .Address }}</td>
<td>{{ .Timestamp }}</td>
Expand Down Expand Up @@ -138,16 +140,17 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

ingesters = append(ingesters, struct {
ID, State, Address, Timestamp string
Tokens []uint32
NumTokens int
Ownership float64
ID, State, Address, Timestamp, Zone string
Tokens []uint32
NumTokens int
Ownership float64
}{
ID: id,
State: state,
Address: ing.Addr,
Timestamp: timestamp.String(),
Tokens: ing.Tokens,
Zone: ing.Zone,
NumTokens: len(ing.Tokens),
Ownership: (float64(owned[id]) / float64(math.MaxUint32)) * 100,
})
Expand Down
19 changes: 14 additions & 5 deletions pkg/ring/lifecycler.go
Expand Up @@ -55,6 +55,7 @@ type LifecyclerConfig struct {
InfNames []string `yaml:"interface_names"`
FinalSleep time.Duration `yaml:"final_sleep"`
TokensFilePath string `yaml:"tokens_file_path"`
Zone string `yaml:"availability_zone"`

// For testing, you can override the address and ID of this ingester
Addr string `yaml:"address" doc:"hidden"`
Expand Down Expand Up @@ -97,6 +98,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
f.StringVar(&cfg.Addr, prefix+"lifecycler.addr", "", "IP address to advertise in consul.")
f.IntVar(&cfg.Port, prefix+"lifecycler.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).")
f.StringVar(&cfg.ID, prefix+"lifecycler.ID", hostname, "ID to register into consul.")
f.StringVar(&cfg.Zone, prefix+"availability-zone", "", "The availability zone of the host, this instance is running on. Default is the lifecycler ID.")
}

// Lifecycler is responsible for managing the lifecycle of entries in the ring.
Expand All @@ -114,6 +116,7 @@ type Lifecycler struct {
Addr string
RingName string
RingKey string
Zone string

// Whether to flush if transfer fails on shutdown.
flushOnShutdown bool
Expand Down Expand Up @@ -154,6 +157,11 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
return nil, err
}

zone := cfg.Zone
if zone == "" {
zone = cfg.ID
}

// We do allow a nil FlushTransferer, but to keep the ring logic easier we assume
// it's always set, so we use a noop FlushTransferer
if flushTransferer == nil {
Expand All @@ -170,6 +178,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
RingName: ringName,
RingKey: ringKey,
flushOnShutdown: flushOnShutdown,
Zone: zone,

actorChan: make(chan func()),

Expand Down Expand Up @@ -496,14 +505,14 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
if len(tokensFromFile) >= i.cfg.NumTokens {
i.setState(ACTIVE)
}
ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState())
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState())
i.setTokens(tokensFromFile)
return ringDesc, true, nil
}

// Either we are a new ingester, or consul must have restarted
level.Info(util.Logger).Log("msg", "instance not found in ring, adding with no tokens", "ring", i.RingName)
ringDesc.AddIngester(i.ID, i.Addr, []uint32{}, i.GetState())
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState())
return ringDesc, true, nil
}

Expand Down Expand Up @@ -558,7 +567,7 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
ringTokens = append(ringTokens, newTokens...)
sort.Sort(ringTokens)

ringDesc.AddIngester(i.ID, i.Addr, ringTokens, i.GetState())
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState())

i.setTokens(ringTokens)

Expand Down Expand Up @@ -620,7 +629,7 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState IngesterState) er
sort.Sort(myTokens)
i.setTokens(myTokens)

ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState())
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState())

return ringDesc, true, nil
})
Expand Down Expand Up @@ -649,7 +658,7 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error {
if !ok {
// consul must have restarted
level.Info(util.Logger).Log("msg", "found empty ring, inserting tokens", "ring", i.RingName)
ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState())
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState())
} else {
ingesterDesc.Timestamp = time.Now().Unix()
ingesterDesc.State = i.GetState()
Expand Down
6 changes: 4 additions & 2 deletions pkg/ring/model.go
Expand Up @@ -37,7 +37,7 @@ func NewDesc() *Desc {

// AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens,
// any other tokens are removed.
func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState) {
func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state IngesterState) {
if d.Ingesters == nil {
d.Ingesters = map[string]IngesterDesc{}
}
Expand All @@ -47,6 +47,7 @@ func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState
Timestamp: time.Now().Unix(),
State: state,
Tokens: tokens,
Zone: zone,
}

d.Ingesters[id] = ingester
Expand Down Expand Up @@ -377,6 +378,7 @@ func (d *Desc) RemoveTombstones(limit time.Time) {
type TokenDesc struct {
Token uint32
Ingester string
Zone string
}

// Returns sorted list of tokens with ingester names.
Expand All @@ -388,7 +390,7 @@ func (d *Desc) getTokens() []TokenDesc {
tokens := make([]TokenDesc, 0, numTokens)
for key, ing := range d.Ingesters {
for _, token := range ing.Tokens {
tokens = append(tokens, TokenDesc{Token: token, Ingester: key})
tokens = append(tokens, TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()})
}
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/ring/ring.go
Expand Up @@ -185,6 +185,7 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet
n = r.cfg.ReplicationFactor
ingesters = buf[:0]
distinctHosts = map[string]struct{}{}
distinctZones = map[string]struct{}{}
start = r.search(key)
iterations = 0
)
Expand All @@ -193,12 +194,16 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet
// Wrap i around in the ring.
i %= len(r.ringTokens)

// We want n *distinct* ingesters.
// We want n *distinct* ingesters && distinct zones.
token := r.ringTokens[i]
if _, ok := distinctHosts[token.Ingester]; ok {
continue
}
if _, ok := distinctZones[token.Zone]; ok {
continue
}
distinctHosts[token.Ingester] = struct{}{}
distinctZones[token.Zone] = struct{}{}
ingester := r.ringDesc.Ingesters[token.Ingester]

// We do not want to Write to Ingesters that are not ACTIVE, but we do want
Expand Down

0 comments on commit b3112b3

Please sign in to comment.