Skip to content

Commit

Permalink
xds/resolver: fix resource deletion (#4143)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Jan 9, 2021
1 parent 85e55dc commit 083393f
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 50 deletions.
23 changes: 12 additions & 11 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ type configSelector struct {
var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found")

func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
if cs == nil {
return nil, status.Errorf(codes.Unavailable, "no valid clusters")
}
var rt *route
// Loop through routes in order and select first match.
for _, r := range cs.routes {
Expand Down Expand Up @@ -157,17 +160,8 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
return config, nil
}

// incRefs increments refs of all clusters referenced by this config selector.
func (cs *configSelector) incRefs() {
// Loops over cs.clusters, but these are pointers to entries in
// activeClusters.
for _, ci := range cs.clusters {
atomic.AddInt32(&ci.refCount, 1)
}
}

// decRefs decrements refs of all clusters referenced by this config selector.
func (cs *configSelector) decRefs() {
// stop decrements refs of all clusters referenced by this config selector.
func (cs *configSelector) stop() {
// The resolver's old configSelector may be nil. Handle that here.
if cs == nil {
return
Expand Down Expand Up @@ -234,6 +228,13 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
}
}

// Account for this config selector's clusters. Do this after no further
// errors may occur. Note: cs.clusters are pointers to entries in
// activeClusters.
for _, ci := range cs.clusters {
atomic.AddInt32(&ci.refCount, 1)
}

return cs, nil
}

Expand Down
102 changes: 63 additions & 39 deletions xds/internal/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,43 @@ type xdsResolver struct {
curConfigSelector *configSelector
}

// sendNewServiceConfig prunes active clusters, generates a new service config
// based on the current set of active clusters, and sends an update to the
// channel with that service config and the provided config selector. Returns
// false if an error occurs while generating the service config and the update
// cannot be sent.
func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
// Delete entries from r.activeClusters with zero references;
// otherwise serviceConfigJSON will generate a config including
// them.
r.pruneActiveClusters()

if cs == nil && len(r.activeClusters) == 0 {
// There are no clusters and we are sending a failing configSelector.
// Send an empty config, which picks pick-first, with no address, and
// puts the ClientConn into transient failure.
r.cc.UpdateState(resolver.State{ServiceConfig: r.cc.ParseServiceConfig("{}")})
return true
}

// Produce the service config.
sc, err := serviceConfigJSON(r.activeClusters)
if err != nil {
// JSON marshal error; should never happen.
r.logger.Errorf("%v", err)
r.cc.ReportError(err)
return false
}
r.logger.Infof("Received update on resource %v from xds-client %p, generated service config: %v", r.target.Endpoint, r.client, sc)

// Send the update to the ClientConn.
state := iresolver.SetConfigSelector(resolver.State{
ServiceConfig: r.cc.ParseServiceConfig(sc),
}, cs)
r.cc.UpdateState(state)
return true
}

// run is a long running goroutine which blocks on receiving service updates
// and passes it on the ClientConn.
func (r *xdsResolver) run() {
Expand All @@ -155,15 +192,15 @@ func (r *xdsResolver) run() {
if update.err != nil {
r.logger.Warningf("Watch error on resource %v from xds-client %p, %v", r.target.Endpoint, r.client, update.err)
if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound {
// If error is resource-not-found, it means the LDS resource
// was removed. Send an empty service config, which picks
// pick-first, with no address, and puts the ClientConn into
// transient failure..
r.cc.UpdateState(resolver.State{
ServiceConfig: r.cc.ParseServiceConfig("{}"),
})
// Dereference the active config selector, if one exists.
r.curConfigSelector.decRefs()
// If error is resource-not-found, it means the LDS
// resource was removed. Ultimately send an empty service
// config, which picks pick-first, with no address, and
// puts the ClientConn into transient failure. Before we
// can do that, we may need to send a normal service config
// along with an erroring (nil) config selector.
r.sendNewServiceConfig(nil)
// Stop and dereference the active config selector, if one exists.
r.curConfigSelector.stop()
r.curConfigSelector = nil
continue
}
Expand All @@ -173,43 +210,30 @@ func (r *xdsResolver) run() {
r.cc.ReportError(update.err)
continue
}
var cs *configSelector
if !update.emptyUpdate {
// Create the config selector for this update.
var err error
if cs, err = r.newConfigSelector(update.su); err != nil {
r.logger.Warningf("Error parsing update on resource %v from xds-client %p: %v", r.target.Endpoint, r.client, err)
r.cc.ReportError(err)
continue
}
} else {
// Empty update; use the existing config selector.
cs = r.curConfigSelector
if update.emptyUpdate {
r.sendNewServiceConfig(r.curConfigSelector)
continue
}
// Account for this config selector's clusters.
cs.incRefs()
// Delete entries from r.activeClusters with zero references;
// otherwise serviceConfigJSON will generate a config including
// them.
r.pruneActiveClusters()
// Produce the service config.
sc, err := serviceConfigJSON(r.activeClusters)

// Create the config selector for this update.
cs, err := r.newConfigSelector(update.su)
if err != nil {
// JSON marshal error; should never happen.
r.logger.Errorf("%v", err)
r.logger.Warningf("Error parsing update on resource %v from xds-client %p: %v", r.target.Endpoint, r.client, err)
r.cc.ReportError(err)
cs.decRefs()
continue
}
r.logger.Infof("Received update on resource %v from xds-client %p, generated service config: %v", r.target.Endpoint, r.client, sc)
// Send the update to the ClientConn.
state := iresolver.SetConfigSelector(resolver.State{
ServiceConfig: r.cc.ParseServiceConfig(sc),
}, cs)
r.cc.UpdateState(state)

if !r.sendNewServiceConfig(cs) {
// JSON error creating the service config (unexpected); erase
// this config selector and ignore this update, continuing with
// the previous config selector.
cs.stop()
continue
}

// Decrement references to the old config selector and assign the
// new one as the current one.
r.curConfigSelector.decRefs()
r.curConfigSelector.stop()
r.curConfigSelector = cs
}
}
Expand Down
172 changes: 172 additions & 0 deletions xds/internal/resolver/xds_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/internal"
Expand All @@ -36,6 +37,7 @@ import (
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/client"
Expand Down Expand Up @@ -435,6 +437,176 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
}
}

// TestXDSResolverRemovedWithRPCs tests the case where a config selector sends
// an empty update to the resolver after the resource is removed.
func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) {
xdsC := fakeclient.NewClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer cancel()
defer xdsR.Close()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)

// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{"test-cluster-1": 1}}},
},
},
}, nil)

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}

// "Make an RPC" by invoking the config selector.
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatalf("received nil config selector")
}

res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()})
if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
}

// Delete the resource
suErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "resource removed error")
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr)

if _, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
}

// "Finish the RPC"; this could cause a panic if the resolver doesn't
// handle it correctly.
res.OnCommitted()
}

// TestXDSResolverRemovedResource tests for proper behavior after a resource is
// removed.
func (s) TestXDSResolverRemovedResource(t *testing.T) {
xdsC := fakeclient.NewClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer cancel()
defer xdsR.Close()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)

// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{"test-cluster-1": 1}}},
},
},
}, nil)
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
}
}
}}]}`
wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON)

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config))
}

// "Make an RPC" by invoking the config selector.
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatalf("received nil config selector")
}

res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()})
if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
}

// "Finish the RPC"; this could cause a panic if the resolver doesn't
// handle it correctly.
res.OnCommitted()

// Delete the resource. The channel should receive a service config with the
// original cluster but with an erroring config selector.
suErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "resource removed error")
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr)

if gotState, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config))
}

// "Make another RPC" by invoking the config selector.
cs = iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatalf("received nil config selector")
}

res, err = cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()})
if err == nil || status.Code(err) != codes.Unavailable {
t.Fatalf("Expected UNAVAILABLE error from cs.SelectConfig(_); got %v, %v", res, err)
}

// In the meantime, an empty ServiceConfig update should have been sent.
if gotState, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed = internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)("{}")
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config))
}
}

func (s) TestXDSResolverWRR(t *testing.T) {
xdsC := fakeclient.NewClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
Expand Down

0 comments on commit 083393f

Please sign in to comment.