Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve etcd event processing #7414

Merged
merged 2 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
149 changes: 132 additions & 17 deletions lib/backend/etcdbk/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/tlsca"
"github.com/gravitational/teleport/lib/utils"
cq "github.com/gravitational/teleport/lib/utils/concurrentqueue"

"github.com/coreos/go-semver/semver"
"github.com/gravitational/trace"
Expand Down Expand Up @@ -111,6 +112,18 @@ var (
Buckets: prometheus.ExponentialBuckets(0.001, 2, 16),
},
)
eventCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_events",
Help: "Number of etcd events",
},
)
eventBackpressure = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_event_backpressure",
Help: "Number of etcd events that hit backpressure",
},
)

prometheusCollectors = []prometheus.Collector{
writeLatencies, txLatencies, batchReadLatencies,
Expand Down Expand Up @@ -356,35 +369,137 @@ func (b *EtcdBackend) reconnect(ctx context.Context) error {
}

func (b *EtcdBackend) asyncWatch() {
err := b.watchEvents()
b.Debugf("Watch exited: %v.", err)
defer close(b.watchDone)
var err error
WatchEvents:
for b.ctx.Err() == nil {
err = b.watchEvents(b.ctx)
fspmarshall marked this conversation as resolved.
Show resolved Hide resolved

b.Debugf("Watch exited: %v", err)

// pause briefly to prevent excessive watcher creation attempts
select {
case <-time.After(utils.HalfJitter(time.Millisecond * 1500)):
case <-b.ctx.Done():
break WatchEvents
}

// buffer must be reset before recreation in order to avoid duplicate
// and/or missing values in the buffer watcher event stream.
b.buf.Reset()
}
b.Debugf("Watch stopped: %v.", trace.NewAggregate(err, b.ctx.Err()))
}

func (b *EtcdBackend) watchEvents() error {
defer close(b.watchDone)
// eventResult is used to ferry the result of event processing
type eventResult struct {
original clientv3.Event
event backend.Event
err error
}

// watchEvents spawns an etcd watcher and forwards events to the event buffer. the internals of this
// function are complicated somewhat by the fact that we need to make a per-event API call to translate
// lease IDs into expiry times. if events are being created faster than their expiries can be resolved,
// this eventually results in runaway memory usage within the etcd client. To combat this, we use a
// concurrentqueue.Queue to parallelize the event processing logic while preserving event order. While
// effective, this strategy still suffers from a "head of line blocking"-esque issue since event order
// must be preserved.
func (b *EtcdBackend) watchEvents(ctx context.Context) error {

// etcd watch client relies on context cancellation for cleanup,
// so create a new subscope for this function.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

start:
eventsC := b.client.Watch(b.ctx, b.cfg.Key, clientv3.WithPrefix())
// wrap fromEvent in a closure compatible with the concurrent queue
workfn := func(v interface{}) interface{} {
original := v.(clientv3.Event)
var event backend.Event
e, err := b.fromEvent(ctx, original)
if e != nil {
event = *e
}
return eventResult{
original: original,
event: event,
err: err,
}
}

// constants here are a bit arbitrary. the goal is to set up the queue s.t.
// it could handle >100 events per second assuming an avg of .2 seconds of processing
// time per event (as seen in tests of under-provisioned etcd instances).
q := cq.New(
workfn,
cq.Workers(24),
cq.Capacity(240),
cq.InputBuf(120),
cq.OutputBuf(48),
)
defer q.Close()

// launch background process responsible for forwarding events from the queue
// to the buffer.
go func() {
PushToBuf:
for {
select {
case p := <-q.Pop():
r := p.(eventResult)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since nil is a valid output value on the Pop channel wouldn't this panic if p were actually nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concurrentqueue abstraction was built to handle work functions that return nil, but the one in use here never does, so the value will never be nil.

if r.err != nil {
b.WithError(r.err).Errorf("Failed to unmarshal event: %v.", r.original)
continue PushToBuf
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: You could probably get rid of the goto label if you just did b.buf.Push(e.event) under else branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, even a plain continue should work as intended here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was deliberate and I'd prefer to leave it as-is. While not technically necessary, I've come to be of the opinion that because break needs labels when inside of a select statement, its best to just always use loop labels when inside select statements.

}
b.buf.Push(r.event)
case <-q.Done():
return
}
}
}()

// start watching
eventsC := b.client.Watch(ctx, b.cfg.Key, clientv3.WithPrefix())
b.signalWatchStart()

var lastBacklogWarning time.Time
for {
select {
case e, ok := <-eventsC:
if e.Canceled || !ok {
b.Debugf("Watch channel has closed.")
goto start
return trace.ConnectionProblem(nil, "etcd watch channel closed")
}
out := make([]backend.Event, 0, len(e.Events))

PushToQueue:
for i := range e.Events {
event, err := b.fromEvent(b.ctx, *e.Events[i])
if err != nil {
b.Errorf("Failed to unmarshal event: %v %v.", err, *e.Events[i])
} else {
out = append(out, *event)
eventCount.Inc()

var event clientv3.Event = *e.Events[i]
// attempt non-blocking push. We allocate a large input buffer for the queue, so this
// aught to succeed reliably.
select {
case q.Push() <- event:
continue PushToQueue
default:
}

eventBackpressure.Inc()

// limit backlog warnings to once per minute to prevent log spam.
if now := time.Now(); now.After(lastBacklogWarning.Add(time.Minute)) {
b.Warnf("Etcd event processing backlog; may result in excess memory usage and stale cluster state.")
lastBacklogWarning = now
}

// fallblack to blocking push
select {
case q.Push() <- event:
case <-ctx.Done():
return trace.ConnectionProblem(ctx.Err(), "context is closing")
}
}
b.buf.PushBatch(out)
case <-b.ctx.Done():
return trace.ConnectionProblem(b.ctx.Err(), "context is closing")
case <-ctx.Done():
return trace.ConnectionProblem(ctx.Err(), "context is closing")
}
}
}
Expand Down
33 changes: 33 additions & 0 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cache

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -891,13 +892,45 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
retry.Reset()

c.notify(c.ctx, Event{Type: WatcherStarted})

var lastStalenessWarning time.Time
var staleEventCount int
for {
select {
case <-watcher.Done():
return trace.ConnectionProblem(watcher.Error(), "watcher is closed")
case <-c.ctx.Done():
return trace.ConnectionProblem(c.ctx.Err(), "context is closing")
case event := <-watcher.Events():
// check for expired resources in OpPut events and log them periodically. stale OpPut events
// may be an indicator of poor performance, and can lead to confusing and inconsistent state
// as the cache may prune items that aught to exist.
//
// NOTE: The inconsistent state mentioned above is a symptom of a deeper issue with the cache
// design. The cache should not expire individual items. It should instead rely on OpDelete events
// from backend expiries. As soon as the cache has expired at least one item, it is no longer
// a faithful representation of a real backend state, since it is 'anticipating' a change in
// backend state that may or may not have actually happened. Instead, it aught to serve the
// most recent internally-consistent "view" of the backend, and individual consumers should
// determine if the resources they are handling are sufficiently fresh. Resource-level expiry
// is a convenience/cleanup feature and aught not be relied upon for meaningful logic anyhow.
// If we need to protect against a stale cache, we aught to invalidate the cache in its entirity, rather
// than pruning the resources that we think *might* have been removed from the real backend.
// TODO(fspmarshall): ^^^
//
if event.Type == types.OpPut && !event.Resource.Expiry().IsZero() {
staleEventCount++
if now := c.Clock.Now(); now.After(event.Resource.Expiry()) && now.After(lastStalenessWarning.Add(time.Minute)) {
kind := event.Resource.GetKind()
if sk := event.Resource.GetSubKind(); sk != "" {
kind = fmt.Sprintf("%s/%s", kind, sk)
}
c.Warningf("Encountered %d stale event(s), may indicate degraded backend or event system performance. last_kind=%q", staleEventCount, kind)
lastStalenessWarning = now
staleEventCount = 0
}
}

err = c.processEvent(ctx, event)
if err != nil {
return trace.Wrap(err)
Expand Down