New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve etcd event processing #7414
Conversation
35a325b
to
c0fe5ae
Compare
c0fe5ae
to
2e12019
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few nits/questions.
input: make(chan interface{}, cfg.inputBuf), | ||
output: make(chan interface{}, cfg.outputBuf), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check that inputBuf
and outputBuf
are >=0?
lib/utils/concurrentqueue/queue.go
Outdated
select { | ||
case itm := <-workerOut: | ||
if itm.nonce == nonce { | ||
// item matches current nonce, send it immdeiately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way it's currently implemented, it might not be sent immediately unless I'm missing something? It will go to the next for loop iteration and if there are other items in workerOut
, this select may just keep accumulating them in queue
(IIRC select case order is not defined).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, it isn't sent immediately, the loop just goes immediately into a sending state rather than enqueuing the value. I've updated the comments to be more clear.
IMO output channel starvation isn't a serious risk here. Any non-trivial work function will leave this select statement idle (i.e. able to emit events) the majority of the time. Any flow of events arriving so fast as to be a serious hindrance to event emission will hit the semaphore limit almost immediately, at which point emission will succeed anyhow.
r := p.(eventResult) | ||
if r.err != nil { | ||
b.Errorf("Failed to unmarshal event: %v %v.", r.err, r.original) | ||
continue PushToBuf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: You could probably get rid of the goto label if you just did b.buf.Push(e.event)
under else
branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, even a plain continue
should work as intended here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was deliberate and I'd prefer to leave it as-is. While not technically necessary, I've come to be of the opinion that because break
needs labels when inside of a select
statement, its best to just always use loop labels when inside select
statements.
lib/backend/etcdbk/etcd.go
Outdated
event, err := b.fromEvent(b.ctx, original) | ||
return eventResult{ | ||
original: original, | ||
event: *event, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this panic if there was an error and the returned event
was nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oof, yes. Good catch!
for { | ||
select { | ||
case p := <-q.Pop(): | ||
r := p.(eventResult) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since nil is a valid output value on the Pop
channel wouldn't this panic if p
were actually nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concurrentqueue
abstraction was built to handle work functions that return nil
, but the one in use here never does, so the value will never be nil.
r := p.(eventResult) | ||
if r.err != nil { | ||
b.Errorf("Failed to unmarshal event: %v %v.", r.err, r.original) | ||
continue PushToBuf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, even a plain continue
should work as intended here.
@fspmarshall Can you apply @andrejtokarcik 's changes then ping him to re-review? |
2e12019
to
7c14b01
Compare
b76e24d
to
990d781
Compare
990d781
to
96d84bf
Compare
Fixes an issue where slow etcd event processing in very large teleport clusters could lead to serious issues, including uncapped growth in memory usage, and disappearing nodes.
The etcd backend implementation must make per-event API calls against the etcd server in order to determine the expiry of items (translating a lease ID to a TTL). Under very specific circumstances, it is possible for events to be created at a faster rate than their expirations can be looked up, causing backpressure against the etcd client. Not only does this cause teleport's event stream to get stale (eventually causing expiring resources such as nodes to disappear from caches), but it also causes uncapped memory growth within the etcd client itself.
To address this problem while preserving correct event order in the final output (required for cache validity), a new
utils/concurrentqueue
package was added which provides a helper for concurrently processing values while preserving correct result ordering.Stale event stream created a lot of "red herring" issues that complicated the debugging process, so caches now emit warnings when they see stale events.
Example of auth server memory usage before and after deployment of this change:
Fixes #6474