Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid potential proxycfg/xDS deadlock using non-blocking send #9689

Merged
merged 7 commits into from Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 26 additions & 29 deletions agent/proxycfg/state.go
Expand Up @@ -583,8 +583,6 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
}

func (s *state) run() {
logger := s.logger.Named(logging.ProxyConfig)

// Close the channel we return from Watch when we stop so consumers can stop
// watching and clean up their goroutines. It's important we do this here and
// not in Close since this routine sends on this chan and so might panic if it
Expand All @@ -605,12 +603,10 @@ func (s *state) run() {
case <-s.ctx.Done():
return
case u := <-s.ch:
logger.Trace("A blocking query returned; handling snapshot update",
"proxy-id", s.proxyID.String(),
)
s.logger.Trace("A blocking query returned; handling snapshot update")

if err := s.handleUpdate(u, &snap); err != nil {
logger.Error("Failed to handle update from watch",
s.logger.Error("Failed to handle update from watch",
"id", u.CorrelationID, "error", err,
)
continue
Expand All @@ -621,53 +617,54 @@ func (s *state) run() {
// etc on future updates.
snapCopy, err := snap.Clone()
if err != nil {
logger.Error("Failed to copy config snapshot for proxy",
"proxy-id", s.proxyID.String(), "error", err,
s.logger.Error("Failed to copy config snapshot for proxy",
"error", err,
)
continue
}

select {
// try to send
case s.snapCh <- *snapCopy:
logger.Trace("Delivered new snapshot to proxy config watchers",
"proxy-id", s.proxyID.String(),
)
s.logger.Trace("Delivered new snapshot to proxy config watchers")

// Allow the next change to trigger a send
coalesceTimer = nil

// avoid blocking if a snapshot is already buffered
// Skip rest of loop - there is nothing to send since nothing changed on
// this iteration
continue

// avoid blocking if a snapshot is already buffered, but queue up a retry with a timer
default:
logger.Trace("Failed to deliver new snapshot to proxy config watchers",
"proxy-id", s.proxyID.String(),
)
}
s.logger.Trace("Failed to deliver new snapshot to proxy config watchers")

// Allow the next change to trigger a send
coalesceTimer = nil
if coalesceTimer == nil {
freddygv marked this conversation as resolved.
Show resolved Hide resolved
coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
sendCh <- struct{}{}
})
}

// Skip rest of loop - there is nothing to send since nothing changed on
// this iteration
continue
// Do not reset coalesceTimer since we just queued a timer-based refresh
continue
}

case replyCh := <-s.reqCh:
logger.Trace("A proxy config snapshot was requested",
"proxy-id", s.proxyID.String(),
)
s.logger.Trace("A proxy config snapshot was requested")

if !snap.Valid() {
// Not valid yet just respond with nil and move on to next task.
replyCh <- nil

logger.Trace("The proxy's config snapshot is not valid yet",
"proxy-id", s.proxyID.String(),
)
s.logger.Trace("The proxy's config snapshot is not valid yet")
continue
}
// Make a deep copy of snap so we don't mutate any of the embedded structs
// etc on future updates.
snapCopy, err := snap.Clone()
if err != nil {
logger.Error("Failed to copy config snapshot for proxy",
"proxy-id", s.proxyID.String(), "error", err,
s.logger.Error("Failed to copy config snapshot for proxy",
"error", err,
)
continue
}
Expand Down
6 changes: 3 additions & 3 deletions agent/xds/server.go
Expand Up @@ -328,7 +328,7 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
defer watchCancel()

logger.Trace("watching proxy, pending initial proxycfg snapshot",
"proxy-id", proxyID.String())
"service_id", proxyID.String())

// Now wait for the config so we can check ACL
state = statePendingInitialConfig
Expand All @@ -342,7 +342,7 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
state = stateRunning

logger.Trace("Got initial config snapshot",
"proxy-id", cfgSnap.ProxyID.String())
"service_id", cfgSnap.ProxyID.String())

// Lets actually process the config we just got or we'll mis responding
fallthrough
Expand All @@ -356,7 +356,7 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
extendAuthTimer()

logger.Trace("Invoking all xDS resource handlers and sending new data if there is any",
"proxy-id", cfgSnap.ProxyID.String())
"service_id", cfgSnap.ProxyID.String())

// See if any handlers need to have the current (possibly new) config
// sent. Note the order here is actually significant so we can't just
Expand Down