Skip to content

xdsclient: do not process updates from closed server channels #8389

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions xds/internal/clients/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@
//
// Only executed in the context of a serializer callback.
func (a *authority) handleADSResourceUpdate(serverConfig *ServerConfig, rType ResourceType, updates map[string]dataAndErrTuple, md xdsresource.UpdateMetadata, onDone func()) {
a.handleRevertingToPrimaryOnUpdate(serverConfig)
if !a.handleRevertingToPrimaryOnUpdate(serverConfig) {
return
}

// We build a list of callback funcs to invoke, and invoke them at the end
// of this method instead of inline (when handling the update for a
Expand Down Expand Up @@ -548,23 +550,47 @@
// lower priority servers are closed and the active server is reverted to the
// highest priority server that sent the update.
//
// The return value indicates whether subsequent processing of the resource
// update should continue or not.
//
// This method is only executed in the context of a serializer callback.
func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig) {
if a.activeXDSChannel != nil && isServerConfigEqual(serverConfig, a.activeXDSChannel.serverConfig) {
func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *ServerConfig) bool {
if a.activeXDSChannel == nil {
// This can happen only when all watches on this authority have been
// removed, and the xdsChannels have been closed. This update should
// have been received prior to closing of the channel, and therefore
// must be ignored.
return false
}

if isServerConfigEqual(serverConfig, a.activeXDSChannel.serverConfig) {
// If the resource update is from the current active server, nothing
// needs to be done from fallback point of view.
return
return true
}

if a.logger.V(2) {
a.logger.Infof("Received update from non-active server %q", serverConfig)
}

// If the resource update is not from the current active server, it means
// that we have received an update from a higher priority server and we need
// to revert back to it. This method guarantees that when an update is
// received from a server, all lower priority servers are closed.
// that we have received an update either from:
// - a server that has a higher priority than the current active server and
// therefore we need to revert back to it and close all lower priority
// servers, or,
// - a server that has a lower priority than the current active server. This
// can happen when the server close and the response race against each
// other. We can safely ignore this update, since we have already reverted
// to the higher priority server, and closed all lower priority servers.
serverIdx := a.serverIndexForConfig(serverConfig)
activeServerIdx := a.serverIndexForConfig(a.activeXDSChannel.serverConfig)
if activeServerIdx < serverIdx {
return false
}

Check warning on line 589 in xds/internal/clients/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/clients/xdsclient/authority.go#L586-L589

Added lines #L586 - L589 were not covered by tests

// At this point, we are guaranteed that we have received a response from a
// higher priority server compared to the current active server. So, we
// revert to the higher priorty server and close all lower priority ones.
a.activeXDSChannel = a.xdsChannelConfigs[serverIdx]

// Close all lower priority channels.
Expand Down Expand Up @@ -602,6 +628,7 @@
}
cfg.channel = nil
}
return true

Check warning on line 631 in xds/internal/clients/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/clients/xdsclient/authority.go#L631

Added line #L631 was not covered by tests
}

// watchResource registers a new watcher for the specified resource type and
Expand Down
41 changes: 34 additions & 7 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@
//
// Only executed in the context of a serializer callback.
func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig, rType xdsresource.Type, updates map[string]ads.DataAndErrTuple, md xdsresource.UpdateMetadata, onDone func()) {
a.handleRevertingToPrimaryOnUpdate(serverConfig)
if !a.handleRevertingToPrimaryOnUpdate(serverConfig) {
return
}

Check warning on line 335 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L334-L335

Added lines #L334 - L335 were not covered by tests

// We build a list of callback funcs to invoke, and invoke them at the end
// of this method instead of inline (when handling the update for a
Expand Down Expand Up @@ -540,23 +542,47 @@
// lower priority servers are closed and the active server is reverted to the
// highest priority server that sent the update.
//
// The return value indicates whether subsequent processing of the resource
// update should continue or not.
//
// This method is only executed in the context of a serializer callback.
func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.ServerConfig) {
if a.activeXDSChannel != nil && a.activeXDSChannel.serverConfig.Equal(serverConfig) {
func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.ServerConfig) bool {
if a.activeXDSChannel == nil {
// This can happen only when all watches on this authority have been
// removed, and the xdsChannels have been closed. This update should
// have been received prior to closing of the channel, and therefore
// must be ignored.
return false
}

Check warning on line 556 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L551-L556

Added lines #L551 - L556 were not covered by tests

if a.activeXDSChannel.serverConfig.Equal(serverConfig) {
// If the resource update is from the current active server, nothing
// needs to be done from fallback point of view.
return
return true
}

if a.logger.V(2) {
a.logger.Infof("Received update from non-active server %q", serverConfig)
}

// If the resource update is not from the current active server, it means
// that we have received an update from a higher priority server and we need
// to revert back to it. This method guarantees that when an update is
// received from a server, all lower priority servers are closed.
// that we have received an update either from:
// - a server that has a higher priority than the current active server and
// therefore we need to revert back to it and close all lower priority
// servers, or,
// - a server that has a lower priority than the current active server. This
// can happen when the server close and the response race against each
// other. We can safely ignore this update, since we have already reverted
// to the higher priority server, and closed all lower priority servers.
serverIdx := a.serverIndexForConfig(serverConfig)
activeServerIdx := a.serverIndexForConfig(a.activeXDSChannel.serverConfig)
if activeServerIdx < serverIdx {
return false
}

Check warning on line 581 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L580-L581

Added lines #L580 - L581 were not covered by tests

// At this point, we are guaranteed that we have received a response from a
// higher priority server compared to the current active server. So, we
// revert to the higher priorty server and close all lower priority ones.
a.activeXDSChannel = a.xdsChannelConfigs[serverIdx]

// Close all lower priority channels.
Expand Down Expand Up @@ -594,6 +620,7 @@
}
cfg.channel = nil
}
return true
}

// watchResource registers a new watcher for the specified resource type and
Expand Down
Loading