Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clusterresolver: handle EDS nacks and resource-not-found errors correctly #6436

Merged
merged 2 commits into from Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion xds/internal/balancer/clusterresolver/clusterresolver.go
Expand Up @@ -85,7 +85,7 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal
b.logger = prefixLogger(b)
b.logger.Infof("Created")

b.resourceWatcher = newResourceResolver(b)
b.resourceWatcher = newResourceResolver(b, b.logger)
b.cc = &ccWrapper{
ClientConn: cc,
resourceWatcher: b.resourceWatcher,
Expand Down
Expand Up @@ -27,13 +27,21 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/wrapperspb"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
Expand Down Expand Up @@ -771,3 +779,289 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
t.Fatalf("EmptyCall() failed with error %v, want %v", err, dnsErr)
}
}

// TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate tests the
// scenario where the top-level cluster is an aggregate cluster that resolves to
// an EDS and LOGICAL_DNS cluster. The management server first sends a good EDS
// response for the EDS cluster and the test verifies that RPCs get routed to
// the EDS cluster. The management server then sends a bad EDS response. The
// test verifies that the cluster_resolver LB policy continues to use the
// previously received good update and that RPCs still get routed to the EDS
// cluster.
func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()

// Start an xDS management server.
mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()

// Start two test backends and extract their host and port. The first
// backend is used for the EDS cluster and the second backend is used for
// the LOGICAL_DNS cluster.
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, ports := backendAddressesAndPorts(t, servers)

// Configure an aggregate cluster pointing to an EDS and DNS cluster. Also
// configure an endpoints resource for the EDS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()

// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}

// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})

// Make an RPC and ensure that it gets routed to the first backend since the
// EDS cluster is of higher priority than the LOGICAL_DNS cluster.
client := testgrpc.NewTestServiceClient(cc)
peer := &peer.Peer{}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != addrs[0].Addr {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
}

// Push an EDS resource from the management server that is expected to be
// NACKed by the xDS client. Since the cluster_resolver LB policy has a
// previously received good EDS resource, it will continue to use that.
resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Ensure that RPCs continue to get routed to the EDS cluster for the next
// second.
for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != addrs[0].Addr {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
}
}
}

// TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate tests the
// scenario where the top-level cluster is an aggregate cluster that resolves to
// an EDS and LOGICAL_DNS cluster. The management server sends a bad EDS
// response. The test verifies that the cluster_resolver LB policy falls back to
// the LOGICAL_DNS cluster, because it is supposed to treat the bad EDS response
// as though it received an update with no endpoints.
func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()

// Start an xDS management server.
mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()

// Start two test backends and extract their host and port. The first
// backend is used for the EDS cluster and the second backend is used for
// the LOGICAL_DNS cluster.
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, ports := backendAddressesAndPorts(t, servers)

// Configure an aggregate cluster pointing to an EDS and DNS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
SkipValidation: true,
}

zasweq marked this conversation as resolved.
Show resolved Hide resolved
// Set a load balancing weight of 0 for the backend in the EDS resource.
// This is expected to be NACKed by the xDS client. Since the
// cluster_resolver LB policy has no previously received good EDS resource,
// it will treat this as though it received an update with no endpoints.
resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()

// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}

// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})

// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
peer := &peer.Peer{}
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != addrs[1].Addr {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[1].Addr)
}
}

// TestAggregateCluster_Fallback_EDS_ResourceNotFound tests the scenario where
// the top-level cluster is an aggregate cluster that resolves to an EDS and
// LOGICAL_DNS cluster. The management server does not respond with the EDS
// cluster. The test verifies that the cluster_resolver LB policy falls back to
// the LOGICAL_DNS cluster in this case.
func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()

// Start an xDS management server.
mgmtServer, nodeID, _, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()

// Start a test backend for the LOGICAL_DNS cluster.
server := stubserver.StartTestService(t, nil)
defer server.Stop()

// Configure an aggregate cluster pointing to an EDS and DNS cluster. No
// endpoints are configured for the EDS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create an xDS client talking to the above management server, configured
// with a short watch expiry timeout.
xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
NodeProto: &v3corepb.Node{Id: nodeID},
}, defaultTestWatchExpiryTimeout, time.Duration(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional, mainly musing. The test above polls for a second, and this requires 500 milliseconds to invoke the resource not found error. Is there a way to write this that doesn't introduce testing time as such? I don't see a way around it but curious your thoughts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test above polls for a second.

I don't understand what you are referring to here.

Is there a way to write this that doesn't introduce testing time as such?

For RDS/EDS, the only way to force a resource-not-found error is by the watch timer firing. And since this is an e2e test, there is no way for us to inject such an event other than actually waiting for it to happen. And the only thing we can do here is to pass a really low value for the watch timeout.

Also, the code towards the end of this test, where it verifies fallback does not actually depend on any timings. It simply verifies that an RPC gets routed to a backend in the DNS cluster before the test context expires.

I don't see a way around it but curious your thoughts.

I don't see a way either :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrt timer...could do something like: https://github.com/grpc/grpc-go/blob/master/xds/internal/balancer/outlierdetection/balancer.go#L50 for time.AfterFunc, or https://github.com/grpc/grpc-go/pull/4270/files#diff-cfb871b425a217deb8602c43a41b39ed1378776d5a40bebee381fec0f231a011R51 for a newTimer. But yeah, prob not worth it in this scenario. I also was thinking setting the 500 ms to something like 10ms since I don't think it matters the length.

if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer close()

// Create a manual resolver and push a service config specifying the use of
// the cds LB policy as the top-level LB policy, and a corresponding config
// with a single cluster.
r := manual.NewBuilderWithScheme("whatever")
jsonSC := fmt.Sprintf(`{
"loadBalancingConfig":[{
"cds_experimental":{
"cluster": "%s"
}
}]
}`, clusterName)
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))

// Create a ClientConn.
cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

// Make an RPC with a short deadline. We expect this RPC to not succeed
// because the DNS resolver has not responded with endpoint addresses.
client := testgrpc.NewTestServiceClient(cc)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
}

// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}

// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: server.Address}}})

// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
// Even though the EDS cluster is of higher priority, since the management
// server does not respond with an EDS resource, the cluster_resolver LB
// policy is expected to fallback to the LOGICAL_DNS cluster once the watch
// timeout expires.
peer := &peer.Peer{}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != server.Address {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
}
}