From 8fc625b33203a1bae209d97171c7eaf9e374beef Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 17 Jan 2024 08:45:58 -0800 Subject: [PATCH] leave Stop as not blocking on handlers by default for backward compatibility --- server.go | 29 +++++++++++--- server_ext_test.go | 79 ++++++++++++++++++++++++++++++++++++++- test/end2end_test.go | 2 +- test/gracefulstop_test.go | 12 ++---- 4 files changed, 105 insertions(+), 17 deletions(-) diff --git a/server.go b/server.go index e22c22122fc..3156c98e049 100644 --- a/server.go +++ b/server.go @@ -139,7 +139,8 @@ type Server struct { quit *grpcsync.Event done *grpcsync.Event channelzRemoveOnce sync.Once - serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop + serveWG sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop + handlersWG sync.WaitGroup // counts active method handler goroutines channelzID *channelz.Identifier czData *channelzData @@ -176,6 +177,7 @@ type serverOptions struct { headerTableSize *uint32 numServerWorkers uint32 recvBufferPool SharedBufferPool + waitForHandlers bool } var defaultServerOptions = serverOptions{ @@ -573,6 +575,21 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { }) } +// WaitForHandlers cause Stop to wait until all outstanding method handlers have +// exited before returning. If false, Stop will return as soon as all +// connections have closed, but method handlers may still be running. By +// default, Stop does not wait for method handlers to return. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func WaitForHandlers(w bool) ServerOption { + return newFuncServerOption(func(o *serverOptions) { + o.waitForHandlers = w + }) +} + // RecvBufferPool returns a ServerOption that configures the server // to use the provided shared buffer pool for parsing incoming messages. Depending // on the application's workload, this could result in reduced memory allocation. @@ -1009,13 +1026,12 @@ func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, }() streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams) - wg := &sync.WaitGroup{} st.HandleStreams(ctx, func(stream *transport.Stream) { - wg.Add(1) + s.handlersWG.Add(1) streamQuota.acquire() f := func() { defer streamQuota.release() - defer wg.Done() + defer s.handlersWG.Done() s.handleStream(st, stream) } @@ -1029,7 +1045,6 @@ func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, } go f() }) - wg.Wait() } var _ http.Handler = (*Server)(nil) @@ -1915,6 +1930,10 @@ func (s *Server) stop(graceful bool) { s.serverWorkerChannelClose() } + if graceful || s.opts.waitForHandlers { + s.handlersWG.Wait() + } + if s.events != nil { s.events.Finish() s.events = nil diff --git a/server_ext_test.go b/server_ext_test.go index d61d431ba4e..7d9f1f5560a 100644 --- a/server_ext_test.go +++ b/server_ext_test.go @@ -186,7 +186,9 @@ func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) { ss.S.GracefulStop() } -func (s) TestHandlersReturnBeforeStop(t *testing.T) { +// Tests the WaitForHandlers ServerOption by leaving an RPC running while Stop +// is called, and ensures Stop doesn't return until the handler returns. +func (s) TestServer_WaitForHandlers(t *testing.T) { started := grpcsync.NewEvent() blockCalls := grpcsync.NewEvent() @@ -199,7 +201,7 @@ func (s) TestHandlersReturnBeforeStop(t *testing.T) { return nil }, } - if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}); err != nil { + if err := ss.Start([]grpc.ServerOption{grpc.WaitForHandlers(true)}); err != nil { t.Fatal("Error starting server:", err) } defer ss.Stop() @@ -255,3 +257,76 @@ func (s) TestHandlersReturnBeforeStop(t *testing.T) { t.Fatalf("Timed out waiting for second RPC to start on server.") } } + +// Tests that GracefulStop will wait for all method handlers to return by +// blocking a handler and ensuring GracefulStop doesn't return until after it is +// unblocked. +func (s) TestServer_GracefulStopWaits(t *testing.T) { + started := grpcsync.NewEvent() + blockCalls := grpcsync.NewEvent() + + // This stub server does not properly respect the stream context, so it will + // not exit when the context is canceled. + ss := stubserver.StubServer{ + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { + started.Fire() + <-blockCalls.Done() + return nil + }, + } + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Start one RPC to the server. + ctx1, cancel1 := context.WithCancel(ctx) + _, err := ss.Client.FullDuplexCall(ctx1) + if err != nil { + t.Fatal("Error staring call:", err) + } + + // Wait for the handler to be invoked. + select { + case <-started.Done(): + case <-ctx.Done(): + t.Fatalf("Timed out waiting for RPC to start on server.") + } + + // Cancel it on the client. The server handler will still be running. + cancel1() + + // Close the connection. This might be sufficient to allow the server to + // return if it doesn't properly wait for outstanding method handlers to + // return. + ss.CC.Close() + + // Try to Stop() the server, which should block indefinitely (until + // blockCalls is fired). + stopped := grpcsync.NewEvent() + go func() { + ss.S.GracefulStop() + stopped.Fire() + }() + + // Wait 100ms and ensure stopped does not fire. + select { + case <-stopped.Done(): + trace := make([]byte, 4096) + trace = trace[0:runtime.Stack(trace, true)] + blockCalls.Fire() + t.Fatalf("Server returned from Stop() illegally. Stack trace:\n%v", string(trace)) + case <-time.After(100 * time.Millisecond): + // Success; unblock the call and wait for stopped. + blockCalls.Fire() + } + + select { + case <-stopped.Done(): + case <-ctx.Done(): + t.Fatalf("Timed out waiting for second RPC to start on server.") + } +} diff --git a/test/end2end_test.go b/test/end2end_test.go index 93a51914644..97a7f181255 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1035,7 +1035,7 @@ func (s) TestDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T) { // connection for the RPC to go out on initially, and that the TCP connection will shut down strictly after // the RPC has been started on it. <-rpcStartedOnServer - go ss.S.Stop() + ss.S.Stop() // The precise behavior of this test is subject to raceyness around the timing // of when TCP packets are sent from client to server, and when we tell the // server to stop, so we need to account for both possible error messages. diff --git a/test/gracefulstop_test.go b/test/gracefulstop_test.go index 3a767e300b9..ecf07d98435 100644 --- a/test/gracefulstop_test.go +++ b/test/gracefulstop_test.go @@ -30,7 +30,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/status" @@ -270,7 +269,7 @@ func (s) TestGracefulStopBlocksUntilGRPCConnectionsTerminate(t *testing.T) { // TestStopAbortsBlockingGRPCCall ensures that when Stop() is called while an ongoing RPC // is blocking that: // - Stop() returns -// - and the RPC fails with an connection closed error on the client-side +// - and the RPC fails with an connection closed error on the client-side func (s) TestStopAbortsBlockingGRPCCall(t *testing.T) { unblockGRPCCall := make(chan struct{}) grpcCallExecuting := make(chan struct{}) @@ -299,13 +298,8 @@ func (s) TestStopAbortsBlockingGRPCCall(t *testing.T) { }() <-grpcCallExecuting - stopReturned := grpcsync.NewEvent() - go func() { - ss.S.Stop() - stopReturned.Fire() - }() + ss.S.Stop() - <-grpcClientCallReturned unblockGRPCCall <- struct{}{} - <-stopReturned.Done() + <-grpcClientCallReturned }