Skip to content

Commit

Permalink
Got to all Doug's comments expect atomic.Pointer
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Jan 11, 2024
1 parent 27e20ab commit 6ea0d50
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 31 deletions.
17 changes: 16 additions & 1 deletion internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,8 +788,23 @@ func DefaultServerListenerWithRouteConfigName(host string, port uint32, secLevel
return defaultServerListenerCommon(host, port, secLevel, routeName, false)
}

// RouteConfigNoRouteMatch returns an xDS RouteConfig resource which a route
// with no route match. This will be NACKed by the xDS Client.
func RouteConfigNoRouteMatch(routeName string) *v3routepb.RouteConfiguration {
return &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to ensure any
// incoming RPC matches to Route_NonForwardingAction and will proceed as
// normal.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Action: &v3routepb.Route_NonForwardingAction{},
}}}}}
}

// RouteConfigNonForwardingAction returns an xDS RouteConfig resource which
// specifies to route to a route specfying non forwarding action. This is
// specifies to route to a route specifying non forwarding action. This is
// intended to be used on the server side for RDS requests, and corresponds to
// the inline route configuration in DefaultServerListener.
func RouteConfigNonForwardingAction(routeName string) *v3routepb.RouteConfiguration {
Expand Down
6 changes: 5 additions & 1 deletion internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,11 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eo

// CallbackConn is a conn with a callback function.
type CallbackConn interface {
Callback(ServerTransport)
// PassServerTransport passes a ServerTransport to the callback Conn. This
// is called in the grpc layer after a ServerTransport for a connection has
// successfully been created, if this method exists on the accepted
// connection.
PassServerTransport(ServerTransport)
}

func (t *http2Server) Drain(debugData string) {
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
}

if cc, ok := rawConn.(transport.CallbackConn); ok {
cc.Callback(st)
cc.PassServerTransport(st)
}

if !s.addConn(lisAddr, st) {
Expand Down
63 changes: 63 additions & 0 deletions test/xds/xds_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,69 @@ func waitForFailedRPCWithStatusCode(ctx context.Context, t *testing.T, cc *grpc.
}
}

// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which
// returns an RDS Resource which is NACKed. This should trigger server should
// move to serving, successfully Accept Connections, and fail at the L7 level
// with a certain error message.
func (s) TestRDSNack(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch, and no RDS resource corresponding to
// this route name.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}

listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
routeConfig := e2e.RouteConfigNoRouteMatch("routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
Routes: []*v3routepb.RouteConfiguration{routeConfig},
SkipValidation: true,
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
serving := grpcsync.NewEvent()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})

server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()

cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

<-serving.Done()
waitForFailedRPCWithStatusCode(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
}

// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which
// returns resource not found. Before getting the resource not found, the xDS
// Server has not received all configuration needed, so it should Accept and
Expand Down
8 changes: 6 additions & 2 deletions xds/internal/server/conn_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func (c *connWrapper) XDSHandshakeInfo() (*xdsinternal.HandshakeInfo, error) {
return xdsinternal.NewHandshakeInfo(c.rootProvider, c.identityProvider, nil, secCfg.RequireClientCert), nil
}

func (c *connWrapper) Callback(st transport.ServerTransport) {
// PassServerTransport drains the passed in ServerTransportif draining is set,
// or persists it to be drained once drained is called.
func (c *connWrapper) PassServerTransport(st transport.ServerTransport) {
c.mu.Lock()
defer c.mu.Unlock()
if c.draining {
Expand All @@ -141,7 +143,9 @@ func (c *connWrapper) Callback(st transport.ServerTransport) {
}
}

func (c *connWrapper) drain() {
// Drain drains the associated ServerTransport, or sets draining to true so it
// will be drained after it is created.
func (c *connWrapper) Drain() {
c.mu.Lock()
defer c.mu.Unlock()
if c.st == nil {
Expand Down
24 changes: 15 additions & 9 deletions xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (l *listenerWrapper) handleLDSUpdate(update xdsresource.ListenerUpdate) {

// maybeUpdateFilterChains swaps in the pending filter chain manager to the
// active one if the pending filter chain manager is present. If a swap occurs,
// it also drains (gracefully stops) any connections that were accepted on the
// it also drains (gracefully stops) any connections that were accepted on the
// old active filter chain manager, and puts this listener in state SERVING.
// Must be called within an xDS Client Callback.
func (l *listenerWrapper) maybeUpdateFilterChains() {
Expand All @@ -202,17 +202,20 @@ func (l *listenerWrapper) maybeUpdateFilterChains() {
// gracefully shut down with a grace period of 10 minutes for long-lived
// RPC's, such that clients will reconnect and have the updated
// configuration apply." - A36
var connsToClose []net.Conn
var connsToClose []xdsresource.DrainConn
if l.activeFilterChainManager != nil { // If there is a filter chain manager to clean up.
connsToClose = l.activeFilterChainManager.Conns()
}
l.activeFilterChainManager = l.pendingFilterChainManager
l.pendingFilterChainManager = nil
l.instantiateFilterChainRoutingConfigurationsLocked()
l.mu.Unlock()
for _, conn := range connsToClose {
conn.(*connWrapper).drain()
}
go func() {
for _, conn := range connsToClose {
conn.Drain()
}
}()

}

// handleRDSUpdate rebuilds any routing configuration server side for any filter
Expand All @@ -221,7 +224,7 @@ func (l *listenerWrapper) maybeUpdateFilterChains() {
func (l *listenerWrapper) handleRDSUpdate(routeName string, rcu rdsWatcherUpdate) {
// Update any filter chains that point to this route configuration.
if l.activeFilterChainManager != nil {
for _, fc := range l.activeFilterChainManager.FilterChains() { // v4 and v6 filter chains...why doesn't this update the first time?
for _, fc := range l.activeFilterChainManager.FilterChains() {
if fc.RouteConfigName != routeName {
continue

Check warning on line 229 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L229

Added line #L229 was not covered by tests
}
Expand Down Expand Up @@ -373,9 +376,12 @@ func (l *listenerWrapper) switchModeLocked(newMode connectivity.ServingMode, err
}
l.mode = newMode
if l.mode == connectivity.ServingModeNotServing {
for _, conn := range l.activeFilterChainManager.Conns() {
conn.(*connWrapper).drain()
}
connsToClose := l.activeFilterChainManager.Conns()
go func() {
for _, conn := range connsToClose {
conn.Drain()
}
}()
}
// The XdsServer API will allow applications to register a "serving state"
// callback to be invoked when the server begins serving and when the
Expand Down
22 changes: 9 additions & 13 deletions xds/internal/server/rds_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool)
w := &rdsWatcher{parent: rh, routeName: routeName}
cancel := xdsresource.WatchRouteConfig(rh.xdsC, routeName, w)
// Set bit on cancel function to eat any RouteConfiguration calls
// for this watcher after it has been cancelled.
// for this watcher after it has been canceled.
rh.cancels[routeName] = func() {
w.mu.Lock()
w.cancelled = true
w.canceled = true
w.mu.Unlock()
cancel()
}
Expand Down Expand Up @@ -107,10 +107,7 @@ func (rh *rdsHandler) determineRouteConfigurationReady() bool {

// Must be called from an xDS Client Callback.
func (rh *rdsHandler) handleRouteUpdate(routeName string, update rdsWatcherUpdate) {
rwu, ok := rh.updates[routeName]
if !ok {
rwu = rdsWatcherUpdate{}
}
rwu := rh.updates[routeName]

if update.err != nil {
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
Expand All @@ -121,8 +118,7 @@ func (rh *rdsHandler) handleRouteUpdate(routeName string, update rdsWatcherUpdat
// Write error.
rwu.err = update.err
} else {
rwu.update = update.update
rwu.err = nil
rwu = update
}
rh.updates[routeName] = rwu
rh.callback(routeName, rwu)
Expand Down Expand Up @@ -151,13 +147,13 @@ type rdsWatcher struct {
logger *igrpclog.PrefixLogger
routeName string

mu sync.Mutex
cancelled bool // eats callbacks if true
mu sync.Mutex
canceled bool // eats callbacks if true
}

func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
rw.mu.Lock()
if rw.cancelled {
if rw.canceled {
rw.mu.Unlock()
return
}

Check warning on line 159 in xds/internal/server/rds_handler.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/rds_handler.go#L157-L159

Added lines #L157 - L159 were not covered by tests
Expand All @@ -170,7 +166,7 @@ func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {

func (rw *rdsWatcher) OnError(err error) {
rw.mu.Lock()
if rw.cancelled {
if rw.canceled {
rw.mu.Unlock()
return
}

Check warning on line 172 in xds/internal/server/rds_handler.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/rds_handler.go#L170-L172

Added lines #L170 - L172 were not covered by tests
Expand All @@ -183,7 +179,7 @@ func (rw *rdsWatcher) OnError(err error) {

func (rw *rdsWatcher) OnResourceDoesNotExist() {
rw.mu.Lock()
if rw.cancelled {
if rw.canceled {
rw.mu.Unlock()
return
}

Check warning on line 185 in xds/internal/server/rds_handler.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/rds_handler.go#L183-L185

Added lines #L183 - L185 were not covered by tests
Expand Down
11 changes: 11 additions & 0 deletions xds/internal/xdsclient/clientimpl_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string,
return func() {}
}

// TODO: replace this with the code does the following when we have
// implemented generic watch API on the authority:
// - Parse the resource name and extract the authority.
// - Locate the corresponding authority object and acquire a reference to
// it. If the authority is not found, error out.
// - Call the watchResource() method on the authority.
// - Return a cancel function to cancel the watch on the authority and to
// release the reference.

// TODO: Make ParseName return an error if parsing fails, and
// schedule the OnError callback in that case.
n := xdsresource.ParseName(resourceName)
a, unref, err := c.findAuthority(n)
if err != nil {
Expand Down
12 changes: 9 additions & 3 deletions xds/internal/xdsclient/xdsresource/filter_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,18 +215,24 @@ type FilterChainManager struct {
RouteConfigNames map[string]bool

// Persisted to gracefully close once filter chain manager no longer active.
conns []net.Conn
conns []DrainConn
}

// DrainConn is an interface which specifies a Drain method.
type DrainConn interface {
// Drain gracefully closes the Connection.
Drain()
}

// AddConn adds the passed connection to the list of Conns in the filter chain
// manager. Must not be called concurrently with Conns().
func (fcm *FilterChainManager) AddConn(conn net.Conn) {
func (fcm *FilterChainManager) AddConn(conn DrainConn) {
fcm.conns = append(fcm.conns, conn)
}

// Conns returns the list of Conns in the filter chain manager. Must not be
// called concurrently with AddConn().
func (fcm *FilterChainManager) Conns() []net.Conn {
func (fcm *FilterChainManager) Conns() []DrainConn {
return fcm.conns
}

Expand Down
1 change: 0 additions & 1 deletion xds/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func init() {
v3statusgrpc.RegisterClientStatusDiscoveryServiceServer(grpcServer, csdss)
return csdss.Close, nil
})

}

// NewXDSResolverWithConfigForTesting creates a new xDS resolver builder using
Expand Down

0 comments on commit 6ea0d50

Please sign in to comment.