Skip to content

Commit

Permalink
improve etcd event processing
Browse files Browse the repository at this point in the history
  • Loading branch information
fspmarshall committed Jun 28, 2021
1 parent a23f3d3 commit c0fe5ae
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 14 deletions.
141 changes: 127 additions & 14 deletions lib/backend/etcdbk/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/tlsca"
"github.com/gravitational/teleport/lib/utils"
cq "github.com/gravitational/teleport/lib/utils/concurrentqueue"

"github.com/coreos/go-semver/semver"
"github.com/gravitational/trace"
Expand Down Expand Up @@ -110,6 +111,18 @@ var (
Buckets: prometheus.ExponentialBuckets(0.001, 2, 16),
},
)
eventCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_events",
Help: "Number of etcd events",
},
)
eventBackpressure = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_event_backpressure",
Help: "Number of etcd events that hit backpressure",
},
)

prometheusCollectors = []prometheus.Collector{
writeLatencies, txLatencies, batchReadLatencies,
Expand Down Expand Up @@ -355,34 +368,134 @@ func (b *EtcdBackend) reconnect(ctx context.Context) error {
}

func (b *EtcdBackend) asyncWatch() {
err := b.watchEvents()
defer close(b.watchDone)
var err error
WatchEvents:
for b.ctx.Err() == nil {
err = b.watchEvents(b.ctx)

// pause briefly to prevent excessive watcher creation attempts
select {
case <-time.After(utils.HalfJitter(time.Millisecond * 1500)):
case <-b.ctx.Done():
break WatchEvents
}

// buffer must be reset before recreation in order to avoid duplicate
// and/or missing values in the buffer watcher event stream.
b.buf.Reset()
}
if err == nil {
err = b.ctx.Err()
}
b.Debugf("Watch exited: %v.", err)
}

func (b *EtcdBackend) watchEvents() error {
defer close(b.watchDone)
// eventResult is used to ferry the result of event processing
type eventResult struct {
original clientv3.Event
event backend.Event
err error
}

start:
eventsC := b.client.Watch(b.ctx, b.cfg.Key, clientv3.WithPrefix())
// watchEvents spawns an etcd watcher and forwards events to the event buffer. the internals of this
// function are complicated somewhat by the fact that we need to make a per-event API call to translate
// lease IDs into expiry times. if events are being created faster than their expiries can be resolved,
// this eventually results in runaway memory usage within the etcd client. To combat this, we use a
// concurrentqueue.Queue to parallelize the event processing logic while preserving event order. While
// effective, this strategy still suffers from a "head of line blocking"-esque issue since event order
// must be preserved.
func (b *EtcdBackend) watchEvents(ctx context.Context) error {

// etcd watch client relies on context cancellation for cleanup,
// so create a new subscope for this function.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// wrap fromEvent in a closure compatible with the concurrent queue
workfn := func(v interface{}) interface{} {
original := v.(clientv3.Event)
event, err := b.fromEvent(b.ctx, original)
return eventResult{
original: original,
event: *event,
err: err,
}
}

// constants here are a bit arbitrary. the goal is to set up the queue s.t.
// it could handle >100 events per second assuming an avg of .2 seconds of processing
// time per event (as seen in tests of under-provisioned etcd instances).
q := cq.New(
workfn,
cq.Workers(24),
cq.Capacity(240),
cq.InputBuf(120),
cq.OutputBuf(48),
)
defer q.Close()

// launch background process responsible for forwarding events from the queue
// to the buffer.
go func() {
PushToBuf:
for {
select {
case p := <-q.Pop():
r := p.(eventResult)
if r.err != nil {
b.Errorf("Failed to unmarshal event: %v %v.", r.err, r.original)
continue PushToBuf
}
b.buf.Push(r.event)
case <-q.Done():
return
}
}
}()

// start watching
eventsC := b.client.Watch(ctx, b.cfg.Key, clientv3.WithPrefix())
b.signalWatchStart()

var lastBacklogWarning time.Time
for {
select {
case e, ok := <-eventsC:
if e.Canceled || !ok {
b.Debugf("Watch channel has closed.")
goto start
return trace.ConnectionProblem(nil, "etcd watch channel closed")
}
out := make([]backend.Event, 0, len(e.Events))

PushToQueue:
for i := range e.Events {
event, err := b.fromEvent(b.ctx, *e.Events[i])
if err != nil {
b.Errorf("Failed to unmarshal event: %v %v.", err, *e.Events[i])
} else {
out = append(out, *event)
eventCount.Inc()

var event clientv3.Event = *e.Events[i]
// attempt non-blocking push. We allocate a large input buffer for the queue, so this
// aught to succeede reliably.
select {
case q.Push() <- event:
continue PushToQueue
default:
}

eventBackpressure.Inc()

// limit backlog warnings to once per minute to prevent log spam.
if now := time.Now(); now.After(lastBacklogWarning.Add(time.Minute)) {
b.Warnf("Etcd event processing backlog; may result in excess memory usage and stale cluster state.")
lastBacklogWarning = now
}

// fallblack to blocking push
select {
case q.Push() <- event:
case <-ctx.Done():
return trace.ConnectionProblem(b.ctx.Err(), "context is closing")
}
}
b.buf.PushBatch(out)
case <-b.ctx.Done():
case <-ctx.Done():
return trace.ConnectionProblem(b.ctx.Err(), "context is closing")
}
}
Expand Down
25 changes: 25 additions & 0 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,13 +882,38 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
retry.Reset()

c.notify(c.ctx, Event{Type: WatcherStarted})

var lastStalenessWarning time.Time
for {
select {
case <-watcher.Done():
return trace.ConnectionProblem(watcher.Error(), "watcher is closed")
case <-c.ctx.Done():
return trace.ConnectionProblem(c.ctx.Err(), "context is closing")
case event := <-watcher.Events():
// check for expired resources in OpPut events and log them periodically. stale OpPut events
// may be an indicator of poor performance, and can lead to confusing and inconsistent state
// as the cache may prune items that aught to exist.
//
// NOTE: The inconsistent state mentioned above is a symptom of a deeper issue with the cache
// design. The cache should not expire individual items. It should instead rely on OpDelete events
// from backend expiries. As soon as the cache has expired at least one item, it is no longer
// a faithful representation of a real backend state, since it is 'anticipating' a change in
// backend state that may or may not have actually happened. Instead, it aught to serve the
// most recent internally-consistent "view" of the backend, and individual consumers should
// determine if the resources they are handling are sufficiently fresh. Resource-level expiry
// is a convenience/cleanup feature and aught not be relied upon for meaningful logic anyhow.
// If we need to protect against a stale cache, we aught to invalide the cache in its entirity, rather
// than pruning the resources that we think *might* have been removed from the real backend.
// TODO(fspmarshall): ^^^
//
if event.Type == types.OpPut && !event.Resource.Expiry().IsZero() {
if now := c.Clock.Now(); now.After(event.Resource.Expiry()) && now.After(lastStalenessWarning.Add(time.Minute)) {
c.Warning("Encountered stale event(s), may indicate degraded backend or event system performance.")
lastStalenessWarning = now
}
}

err = c.processEvent(ctx, event)
if err != nil {
return trace.Wrap(err)
Expand Down

0 comments on commit c0fe5ae

Please sign in to comment.