Skip to content

Commit

Permalink
Responded to Doug's testing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Jan 16, 2024
1 parent eb521e5 commit 3db6c1b
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 38 deletions.
2 changes: 1 addition & 1 deletion test/xds/xds_client_integration_test.go
Expand Up @@ -45,7 +45,7 @@ func Test(t *testing.T) {

const (
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expdefaultTected to *not* happen.
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)

func (s) TestClientSideXDS(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions test/xds/xds_server_certificate_providers_test.go
Expand Up @@ -231,7 +231,7 @@ func (s) TestServerSideXDS_WithNoCertificateProvidersInBootstrap_Failure(t *test
}
defer cc.Close()

waitForFailedRPCWithStatusCode(ctx, t, cc, errAcceptAndClose...)
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
}

// Tests the case where the bootstrap configuration contains one certificate
Expand Down Expand Up @@ -477,5 +477,5 @@ func (s) TestServerSideXDS_WithValidAndInvalidSecurityConfiguration(t *testing.T
}
defer cc2.Close()

waitForFailedRPCWithStatusCode(ctx, t, cc2, errAcceptAndClose...)
waitForFailedRPCWithStatus(ctx, t, cc2, errAcceptAndClose)
}
53 changes: 20 additions & 33 deletions test/xds/xds_server_test.go
Expand Up @@ -44,11 +44,7 @@ import (
)

var (
errAcceptAndClose = []*status.Status{
status.New(codes.Unavailable, "connection error: desc = \"error reading server preface: EOF\""),
status.New(codes.Unavailable, "write: broken pipe"),
status.New(codes.Unavailable, "read: connection reset by peer"),
}
errAcceptAndClose = status.New(codes.Unavailable, "")
)

// TestServeLDSRDS tests the case where a server receives LDS resource which
Expand Down Expand Up @@ -138,13 +134,13 @@ func (s) TestServeLDSRDS(t *testing.T) {
// "NonForwardingAction is expected for all Routes used on server-side; a
// route with an inappropriate action causes RPCs matching that route to
// fail with UNAVAILABLE." - A36
waitForFailedRPCWithStatusCode(ctx, t, cc, status.New(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding"))
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding"))
}

// waitForFailedRPCWithStatusCode makes unary RPC's until it receives the
// expected status in a polling manner. Fails if the RPC made does not return
// the expected status before the context expires.
func waitForFailedRPCWithStatusCode(ctx context.Context, t *testing.T, cc *grpc.ClientConn, sts ...*status.Status) {
// waitForFailedRPCWithStatus makes unary RPC's until it receives the expected
// status in a polling manner. Fails if the RPC made does not return the
// expected status before the context expires.
func waitForFailedRPCWithStatus(ctx context.Context, t *testing.T, cc *grpc.ClientConn, st *status.Status) {
t.Helper()

c := testgrpc.NewTestServiceClient(cc)
Expand All @@ -154,18 +150,12 @@ func waitForFailedRPCWithStatusCode(ctx context.Context, t *testing.T, cc *grpc.
for {
select {
case <-ctx.Done():
var errString string
if err != nil {
errString = err.Error()
}
t.Fatalf("failure when waiting for RPCs to fail with certain status %v: %v. most recent error received from RPC: %v", sts, ctx.Err(), errString)
t.Fatalf("failure when waiting for RPCs to fail with certain status %v: %v. most recent error received from RPC: %v", st, ctx.Err(), err)
case <-ticker.C:
_, err = c.EmptyCall(ctx, &testpb.Empty{})
for _, st := range sts {
if status.Code(err) == st.Code() && strings.Contains(err.Error(), st.Message()) {
t.Logf("most recent error happy case: %v", err.Error())
return
}
if status.Code(err) == st.Code() && strings.Contains(err.Error(), st.Message()) {
t.Logf("most recent error happy case: %v", err.Error())
return
}
}
}
Expand Down Expand Up @@ -231,7 +221,7 @@ func (s) TestRDSNack(t *testing.T) {
defer cc.Close()

<-serving.Done()
waitForFailedRPCWithStatusCode(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
}

// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which
Expand Down Expand Up @@ -293,29 +283,26 @@ func (s) TestResourceNotFoundRDS(t *testing.T) {
}
defer cc.Close()

waitForFailedRPCWithStatusCode(ctx, t, cc, errAcceptAndClose...)
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)

// Invoke resource not found - this should result in L7 RPC error with
// unavailable receive on serving as a result, should trigger it to go
// serving. Poll as watch might not be started yet to trigger resource not
// found.
loop:
for {
if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("RouteConfigResource", "routeName"); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}
var servingDone bool
select {
case <-serving.Done():
servingDone = true
break loop
case <-ctx.Done():
t.Fatalf("timed out waiting for serving mode to go serving")
default:
}
if servingDone {
break
case <-time.After(time.Millisecond):
}
}
waitForFailedRPCWithStatusCode(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
}

// TestServingModeChanges tests the Server's logic as it transitions from Not
Expand Down Expand Up @@ -377,7 +364,7 @@ func (s) TestServingModeChanges(t *testing.T) {
}
defer cc.Close()

waitForFailedRPCWithStatusCode(ctx, t, cc, errAcceptAndClose...)
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
routeConfig := e2e.RouteConfigNonForwardingAction("routeName")
resources = e2e.UpdateOptions{
NodeID: nodeID,
Expand Down Expand Up @@ -432,7 +419,7 @@ func (s) TestServingModeChanges(t *testing.T) {
}

// New RPCs on that connection should eventually start failing.
waitForFailedRPCWithStatusCode(ctx, t, cc, errAcceptAndClose...)
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
}

// TestMultipleUpdatesImmediatelySwitch tests the case where you get an LDS
Expand Down Expand Up @@ -490,7 +477,7 @@ func (s) TestMultipleUpdatesImmediatelySwitch(t *testing.T) {
}
defer cc.Close()

waitForFailedRPCWithStatusCode(ctx, t, cc, errAcceptAndClose...)
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)

routeConfig1 := e2e.RouteConfigNonForwardingAction("routeName")
routeConfig2 := e2e.RouteConfigFilterAction("routeName2")
Expand Down Expand Up @@ -533,7 +520,7 @@ func (s) TestMultipleUpdatesImmediatelySwitch(t *testing.T) {
// "NonForwardingAction is expected for all Routes used on server-side; a
// route with an inappropriate action causes RPCs matching that route to
// fail with UNAVAILABLE." - A36
waitForFailedRPCWithStatusCode(ctx, t, cc, status.New(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding"))
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding"))

// Stream should be allowed to continue on the old working configuration -
// as it on a connection that is gracefully closed (old FCM/LDS
Expand Down
18 changes: 16 additions & 2 deletions xds/internal/server/listener_wrapper_test.go
Expand Up @@ -26,6 +26,7 @@ import (
"testing"

"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"

Expand Down Expand Up @@ -94,6 +95,7 @@ func (s) TestListenerWrapper(t *testing.T) {
waitForResourceNames(ctx, t, ldsResourceNamesCh, []string{lisResourceName})
// Configure the management server with a listener resource that specifies
// the name of RDS resources that need to be resolved.
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, route1)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, route1)},
Expand Down Expand Up @@ -121,7 +123,7 @@ func (s) TestListenerWrapper(t *testing.T) {
t.Fatal(err)
}

// Mode should go ready.
// Mode should go serving.
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for mode change")
Expand All @@ -131,5 +133,17 @@ func (s) TestListenerWrapper(t *testing.T) {
}
}

// TODO: invoke lds resource not found - should go back to non serving.
// Invoke lds resource not found - should go back to non serving.
if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("ListenerResource", listener.GetName()); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for mode change")
case mode := <-modeCh:
if mode != connectivity.ServingModeNotServing {
t.Fatalf("mode change received: %v, want: %v", mode, connectivity.ServingModeNotServing)
}
}

}

0 comments on commit 3db6c1b

Please sign in to comment.