Skip to content

Commit

Permalink
improve etcd event processing
Browse files Browse the repository at this point in the history
  • Loading branch information
fspmarshall committed Jul 21, 2021
1 parent 629dc43 commit 530430e
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 17 deletions.
149 changes: 132 additions & 17 deletions lib/backend/etcdbk/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/tlsca"
"github.com/gravitational/teleport/lib/utils"
cq "github.com/gravitational/teleport/lib/utils/concurrentqueue"

"github.com/coreos/go-semver/semver"
"github.com/gravitational/trace"
Expand Down Expand Up @@ -111,6 +112,18 @@ var (
Buckets: prometheus.ExponentialBuckets(0.001, 2, 16),
},
)
eventCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_events",
Help: "Number of etcd events",
},
)
eventBackpressure = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_event_backpressure",
Help: "Number of etcd events that hit backpressure",
},
)

prometheusCollectors = []prometheus.Collector{
writeLatencies, txLatencies, batchReadLatencies,
Expand Down Expand Up @@ -356,35 +369,137 @@ func (b *EtcdBackend) reconnect(ctx context.Context) error {
}

func (b *EtcdBackend) asyncWatch() {
err := b.watchEvents()
b.Debugf("Watch exited: %v.", err)
defer close(b.watchDone)
var err error
WatchEvents:
for b.ctx.Err() == nil {
err = b.watchEvents(b.ctx)

b.Debugf("Watch exited: %v", err)

// pause briefly to prevent excessive watcher creation attempts
select {
case <-time.After(utils.HalfJitter(time.Millisecond * 1500)):
case <-b.ctx.Done():
break WatchEvents
}

// buffer must be reset before recreation in order to avoid duplicate
// and/or missing values in the buffer watcher event stream.
b.buf.Reset()
}
b.Debugf("Watch stopped: %v.", trace.NewAggregate(err, b.ctx.Err()))
}

func (b *EtcdBackend) watchEvents() error {
defer close(b.watchDone)
// eventResult is used to ferry the result of event processing
type eventResult struct {
original clientv3.Event
event backend.Event
err error
}

// watchEvents spawns an etcd watcher and forwards events to the event buffer. the internals of this
// function are complicated somewhat by the fact that we need to make a per-event API call to translate
// lease IDs into expiry times. if events are being created faster than their expiries can be resolved,
// this eventually results in runaway memory usage within the etcd client. To combat this, we use a
// concurrentqueue.Queue to parallelize the event processing logic while preserving event order. While
// effective, this strategy still suffers from a "head of line blocking"-esque issue since event order
// must be preserved.
func (b *EtcdBackend) watchEvents(ctx context.Context) error {

// etcd watch client relies on context cancellation for cleanup,
// so create a new subscope for this function.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

start:
eventsC := b.client.Watch(b.ctx, b.cfg.Key, clientv3.WithPrefix())
// wrap fromEvent in a closure compatible with the concurrent queue
workfn := func(v interface{}) interface{} {
original := v.(clientv3.Event)
var event backend.Event
e, err := b.fromEvent(ctx, original)
if e != nil {
event = *e
}
return eventResult{
original: original,
event: event,
err: err,
}
}

// constants here are a bit arbitrary. the goal is to set up the queue s.t.
// it could handle >100 events per second assuming an avg of .2 seconds of processing
// time per event (as seen in tests of under-provisioned etcd instances).
q := cq.New(
workfn,
cq.Workers(24),
cq.Capacity(240),
cq.InputBuf(120),
cq.OutputBuf(48),
)
defer q.Close()

// launch background process responsible for forwarding events from the queue
// to the buffer.
go func() {
PushToBuf:
for {
select {
case p := <-q.Pop():
r := p.(eventResult)
if r.err != nil {
b.WithError(r.err).Errorf("Failed to unmarshal event: %v.", r.original)
continue PushToBuf
}
b.buf.Push(r.event)
case <-q.Done():
return
}
}
}()

// start watching
eventsC := b.client.Watch(ctx, b.cfg.Key, clientv3.WithPrefix())
b.signalWatchStart()

var lastBacklogWarning time.Time
for {
select {
case e, ok := <-eventsC:
if e.Canceled || !ok {
b.Debugf("Watch channel has closed.")
goto start
return trace.ConnectionProblem(nil, "etcd watch channel closed")
}
out := make([]backend.Event, 0, len(e.Events))

PushToQueue:
for i := range e.Events {
event, err := b.fromEvent(b.ctx, *e.Events[i])
if err != nil {
b.Errorf("Failed to unmarshal event: %v %v.", err, *e.Events[i])
} else {
out = append(out, *event)
eventCount.Inc()

var event clientv3.Event = *e.Events[i]
// attempt non-blocking push. We allocate a large input buffer for the queue, so this
// aught to succeed reliably.
select {
case q.Push() <- event:
continue PushToQueue
default:
}

eventBackpressure.Inc()

// limit backlog warnings to once per minute to prevent log spam.
if now := time.Now(); now.After(lastBacklogWarning.Add(time.Minute)) {
b.Warnf("Etcd event processing backlog; may result in excess memory usage and stale cluster state.")
lastBacklogWarning = now
}

// fallblack to blocking push
select {
case q.Push() <- event:
case <-ctx.Done():
return trace.ConnectionProblem(ctx.Err(), "context is closing")
}
}
b.buf.PushBatch(out)
case <-b.ctx.Done():
return trace.ConnectionProblem(b.ctx.Err(), "context is closing")
case <-ctx.Done():
return trace.ConnectionProblem(ctx.Err(), "context is closing")
}
}
}
Expand Down
33 changes: 33 additions & 0 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cache

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -891,13 +892,45 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
retry.Reset()

c.notify(c.ctx, Event{Type: WatcherStarted})

var lastStalenessWarning time.Time
var staleEventCount int
for {
select {
case <-watcher.Done():
return trace.ConnectionProblem(watcher.Error(), "watcher is closed")
case <-c.ctx.Done():
return trace.ConnectionProblem(c.ctx.Err(), "context is closing")
case event := <-watcher.Events():
// check for expired resources in OpPut events and log them periodically. stale OpPut events
// may be an indicator of poor performance, and can lead to confusing and inconsistent state
// as the cache may prune items that aught to exist.
//
// NOTE: The inconsistent state mentioned above is a symptom of a deeper issue with the cache
// design. The cache should not expire individual items. It should instead rely on OpDelete events
// from backend expiries. As soon as the cache has expired at least one item, it is no longer
// a faithful representation of a real backend state, since it is 'anticipating' a change in
// backend state that may or may not have actually happened. Instead, it aught to serve the
// most recent internally-consistent "view" of the backend, and individual consumers should
// determine if the resources they are handling are sufficiently fresh. Resource-level expiry
// is a convenience/cleanup feature and aught not be relied upon for meaningful logic anyhow.
// If we need to protect against a stale cache, we aught to invalidate the cache in its entirity, rather
// than pruning the resources that we think *might* have been removed from the real backend.
// TODO(fspmarshall): ^^^
//
if event.Type == types.OpPut && !event.Resource.Expiry().IsZero() {
staleEventCount++
if now := c.Clock.Now(); now.After(event.Resource.Expiry()) && now.After(lastStalenessWarning.Add(time.Minute)) {
kind := event.Resource.GetKind()
if sk := event.Resource.GetSubKind(); sk != "" {
kind = fmt.Sprintf("%s/%s", kind, sk)
}
c.Warningf("Encountered %d stale event(s), may indicate degraded backend or event system performance. last_kind=%q", staleEventCount, kind)
lastStalenessWarning = now
staleEventCount = 0
}
}

err = c.processEvent(ctx, event)
if err != nil {
return trace.Wrap(err)
Expand Down

0 comments on commit 530430e

Please sign in to comment.