diff --git a/internal/testutils/xds/e2e/clientresources.go b/internal/testutils/xds/e2e/clientresources.go index ed131d510b8..64accf09cd7 100644 --- a/internal/testutils/xds/e2e/clientresources.go +++ b/internal/testutils/xds/e2e/clientresources.go @@ -788,8 +788,23 @@ func DefaultServerListenerWithRouteConfigName(host string, port uint32, secLevel return defaultServerListenerCommon(host, port, secLevel, routeName, false) } +// RouteConfigNoRouteMatch returns an xDS RouteConfig resource which a route +// with no route match. This will be NACKed by the xDS Client. +func RouteConfigNoRouteMatch(routeName string) *v3routepb.RouteConfiguration { + return &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{{ + // This "*" string matches on any incoming authority. This is to ensure any + // incoming RPC matches to Route_NonForwardingAction and will proceed as + // normal. + Domains: []string{"*"}, + Routes: []*v3routepb.Route{{ + Action: &v3routepb.Route_NonForwardingAction{}, + }}}}} +} + // RouteConfigNonForwardingAction returns an xDS RouteConfig resource which -// specifies to route to a route specfying non forwarding action. This is +// specifies to route to a route specifying non forwarding action. This is // intended to be used on the server side for RDS requests, and corresponds to // the inline route configuration in DefaultServerListener. func RouteConfigNonForwardingAction(routeName string) *v3routepb.RouteConfiguration { diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 0d49489b0d6..08a2e34bee7 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -1290,7 +1290,11 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eo // CallbackConn is a conn with a callback function. type CallbackConn interface { - Callback(ServerTransport) + // PassServerTransport passes a ServerTransport to the callback Conn. This + // is called in the grpc layer after a ServerTransport for a connection has + // successfully been created, if this method exists on the accepted + // connection. + PassServerTransport(ServerTransport) } func (t *http2Server) Drain(debugData string) { diff --git a/server.go b/server.go index 547bf6302f1..49180835e82 100644 --- a/server.go +++ b/server.go @@ -930,7 +930,7 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) { } if cc, ok := rawConn.(transport.CallbackConn); ok { - cc.Callback(st) + cc.PassServerTransport(st) } if !s.addConn(lisAddr, st) { diff --git a/test/xds/xds_server_test.go b/test/xds/xds_server_test.go index b938be90e89..c8fa195925e 100644 --- a/test/xds/xds_server_test.go +++ b/test/xds/xds_server_test.go @@ -171,6 +171,69 @@ func waitForFailedRPCWithStatusCode(ctx context.Context, t *testing.T, cc *grpc. } } +// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which +// returns an RDS Resource which is NACKed. This should trigger server should +// move to serving, successfully Accept Connections, and fail at the L7 level +// with a certain error message. +func (s) TestRDSNack(t *testing.T) { + managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + defer cleanup() + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + // Setup the management server to respond with a listener resource that + // specifies a route name to watch, and no RDS resource corresponding to + // this route name. + host, port, err := hostPortFromListener(lis) + if err != nil { + t.Fatalf("failed to retrieve host and port of server: %v", err) + } + + listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName") + routeConfig := e2e.RouteConfigNoRouteMatch("routeName") + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{routeConfig}, + SkipValidation: true, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + serving := grpcsync.NewEvent() + modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { + t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) + if args.Mode == connectivity.ServingModeServing { + serving.Fire() + } + }) + + server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)) + if err != nil { + t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) + } + defer server.Stop() + testgrpc.RegisterTestServiceServer(server, &testService{}) + go func() { + if err := server.Serve(lis); err != nil { + t.Errorf("Serve() failed: %v", err) + } + }() + + cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + <-serving.Done() + waitForFailedRPCWithStatusCode(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration")) +} + // TestResourceNotFoundRDS tests the case where an LDS points to an RDS which // returns resource not found. Before getting the resource not found, the xDS // Server has not received all configuration needed, so it should Accept and diff --git a/xds/internal/server/conn_wrapper.go b/xds/internal/server/conn_wrapper.go index 27d4f5e5092..ee766842c5a 100644 --- a/xds/internal/server/conn_wrapper.go +++ b/xds/internal/server/conn_wrapper.go @@ -131,7 +131,9 @@ func (c *connWrapper) XDSHandshakeInfo() (*xdsinternal.HandshakeInfo, error) { return xdsinternal.NewHandshakeInfo(c.rootProvider, c.identityProvider, nil, secCfg.RequireClientCert), nil } -func (c *connWrapper) Callback(st transport.ServerTransport) { +// PassServerTransport drains the passed in ServerTransportif draining is set, +// or persists it to be drained once drained is called. +func (c *connWrapper) PassServerTransport(st transport.ServerTransport) { c.mu.Lock() defer c.mu.Unlock() if c.draining { @@ -141,7 +143,9 @@ func (c *connWrapper) Callback(st transport.ServerTransport) { } } -func (c *connWrapper) drain() { +// Drain drains the associated ServerTransport, or sets draining to true so it +// will be drained after it is created. +func (c *connWrapper) Drain() { c.mu.Lock() defer c.mu.Unlock() if c.st == nil { diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index b2fd4ccf105..78643ab0bfa 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -187,7 +187,7 @@ func (l *listenerWrapper) handleLDSUpdate(update xdsresource.ListenerUpdate) { // maybeUpdateFilterChains swaps in the pending filter chain manager to the // active one if the pending filter chain manager is present. If a swap occurs, -// it also drains (gracefully stops) any connections that were accepted on the +// it also drains (gracefully stops) any connections that were accepted on the // old active filter chain manager, and puts this listener in state SERVING. // Must be called within an xDS Client Callback. func (l *listenerWrapper) maybeUpdateFilterChains() { @@ -202,7 +202,7 @@ func (l *listenerWrapper) maybeUpdateFilterChains() { // gracefully shut down with a grace period of 10 minutes for long-lived // RPC's, such that clients will reconnect and have the updated // configuration apply." - A36 - var connsToClose []net.Conn + var connsToClose []xdsresource.DrainConn if l.activeFilterChainManager != nil { // If there is a filter chain manager to clean up. connsToClose = l.activeFilterChainManager.Conns() } @@ -210,9 +210,12 @@ func (l *listenerWrapper) maybeUpdateFilterChains() { l.pendingFilterChainManager = nil l.instantiateFilterChainRoutingConfigurationsLocked() l.mu.Unlock() - for _, conn := range connsToClose { - conn.(*connWrapper).drain() - } + go func() { + for _, conn := range connsToClose { + conn.Drain() + } + }() + } // handleRDSUpdate rebuilds any routing configuration server side for any filter @@ -221,7 +224,7 @@ func (l *listenerWrapper) maybeUpdateFilterChains() { func (l *listenerWrapper) handleRDSUpdate(routeName string, rcu rdsWatcherUpdate) { // Update any filter chains that point to this route configuration. if l.activeFilterChainManager != nil { - for _, fc := range l.activeFilterChainManager.FilterChains() { // v4 and v6 filter chains...why doesn't this update the first time? + for _, fc := range l.activeFilterChainManager.FilterChains() { if fc.RouteConfigName != routeName { continue } @@ -373,9 +376,12 @@ func (l *listenerWrapper) switchModeLocked(newMode connectivity.ServingMode, err } l.mode = newMode if l.mode == connectivity.ServingModeNotServing { - for _, conn := range l.activeFilterChainManager.Conns() { - conn.(*connWrapper).drain() - } + connsToClose := l.activeFilterChainManager.Conns() + go func() { + for _, conn := range connsToClose { + conn.Drain() + } + }() } // The XdsServer API will allow applications to register a "serving state" // callback to be invoked when the server begins serving and when the diff --git a/xds/internal/server/rds_handler.go b/xds/internal/server/rds_handler.go index a3084a361b2..1a386e78884 100644 --- a/xds/internal/server/rds_handler.go +++ b/xds/internal/server/rds_handler.go @@ -76,10 +76,10 @@ func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool) w := &rdsWatcher{parent: rh, routeName: routeName} cancel := xdsresource.WatchRouteConfig(rh.xdsC, routeName, w) // Set bit on cancel function to eat any RouteConfiguration calls - // for this watcher after it has been cancelled. + // for this watcher after it has been canceled. rh.cancels[routeName] = func() { w.mu.Lock() - w.cancelled = true + w.canceled = true w.mu.Unlock() cancel() } @@ -107,10 +107,7 @@ func (rh *rdsHandler) determineRouteConfigurationReady() bool { // Must be called from an xDS Client Callback. func (rh *rdsHandler) handleRouteUpdate(routeName string, update rdsWatcherUpdate) { - rwu, ok := rh.updates[routeName] - if !ok { - rwu = rdsWatcherUpdate{} - } + rwu := rh.updates[routeName] if update.err != nil { if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound { @@ -121,8 +118,7 @@ func (rh *rdsHandler) handleRouteUpdate(routeName string, update rdsWatcherUpdat // Write error. rwu.err = update.err } else { - rwu.update = update.update - rwu.err = nil + rwu = update } rh.updates[routeName] = rwu rh.callback(routeName, rwu) @@ -151,13 +147,13 @@ type rdsWatcher struct { logger *igrpclog.PrefixLogger routeName string - mu sync.Mutex - cancelled bool // eats callbacks if true + mu sync.Mutex + canceled bool // eats callbacks if true } func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) { rw.mu.Lock() - if rw.cancelled { + if rw.canceled { rw.mu.Unlock() return } @@ -170,7 +166,7 @@ func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) { func (rw *rdsWatcher) OnError(err error) { rw.mu.Lock() - if rw.cancelled { + if rw.canceled { rw.mu.Unlock() return } @@ -183,7 +179,7 @@ func (rw *rdsWatcher) OnError(err error) { func (rw *rdsWatcher) OnResourceDoesNotExist() { rw.mu.Lock() - if rw.cancelled { + if rw.canceled { rw.mu.Unlock() return } diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go index 90345b7469c..045fe77a17f 100644 --- a/xds/internal/xdsclient/clientimpl_watchers.go +++ b/xds/internal/xdsclient/clientimpl_watchers.go @@ -48,6 +48,17 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, return func() {} } + // TODO: replace this with the code does the following when we have + // implemented generic watch API on the authority: + // - Parse the resource name and extract the authority. + // - Locate the corresponding authority object and acquire a reference to + // it. If the authority is not found, error out. + // - Call the watchResource() method on the authority. + // - Return a cancel function to cancel the watch on the authority and to + // release the reference. + + // TODO: Make ParseName return an error if parsing fails, and + // schedule the OnError callback in that case. n := xdsresource.ParseName(resourceName) a, unref, err := c.findAuthority(n) if err != nil { diff --git a/xds/internal/xdsclient/xdsresource/filter_chain.go b/xds/internal/xdsclient/xdsresource/filter_chain.go index ab1f80acdc7..861c45da5e4 100644 --- a/xds/internal/xdsclient/xdsresource/filter_chain.go +++ b/xds/internal/xdsclient/xdsresource/filter_chain.go @@ -215,18 +215,24 @@ type FilterChainManager struct { RouteConfigNames map[string]bool // Persisted to gracefully close once filter chain manager no longer active. - conns []net.Conn + conns []DrainConn +} + +// DrainConn is an interface which specifies a Drain method. +type DrainConn interface { + // Drain gracefully closes the Connection. + Drain() } // AddConn adds the passed connection to the list of Conns in the filter chain // manager. Must not be called concurrently with Conns(). -func (fcm *FilterChainManager) AddConn(conn net.Conn) { +func (fcm *FilterChainManager) AddConn(conn DrainConn) { fcm.conns = append(fcm.conns, conn) } // Conns returns the list of Conns in the filter chain manager. Must not be // called concurrently with AddConn(). -func (fcm *FilterChainManager) Conns() []net.Conn { +func (fcm *FilterChainManager) Conns() []DrainConn { return fcm.conns } diff --git a/xds/xds.go b/xds/xds.go index beaba41d8c8..943d09f17e0 100644 --- a/xds/xds.go +++ b/xds/xds.go @@ -78,7 +78,6 @@ func init() { v3statusgrpc.RegisterClientStatusDiscoveryServiceServer(grpcServer, csdss) return csdss.Close, nil }) - } // NewXDSResolverWithConfigForTesting creates a new xDS resolver builder using