Skip to content

Commit

Permalink
leave Stop as not blocking on handlers by default for backward compat…
Browse files Browse the repository at this point in the history
…ibility
  • Loading branch information
dfawley committed Jan 17, 2024
1 parent c61a4d7 commit 8fc625b
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 17 deletions.
29 changes: 24 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ type Server struct {
quit *grpcsync.Event
done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
serveWG sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop
handlersWG sync.WaitGroup // counts active method handler goroutines

channelzID *channelz.Identifier
czData *channelzData
Expand Down Expand Up @@ -176,6 +177,7 @@ type serverOptions struct {
headerTableSize *uint32
numServerWorkers uint32
recvBufferPool SharedBufferPool
waitForHandlers bool
}

var defaultServerOptions = serverOptions{
Expand Down Expand Up @@ -573,6 +575,21 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
})
}

// WaitForHandlers cause Stop to wait until all outstanding method handlers have
// exited before returning. If false, Stop will return as soon as all
// connections have closed, but method handlers may still be running. By
// default, Stop does not wait for method handlers to return.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WaitForHandlers(w bool) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.waitForHandlers = w
})
}

// RecvBufferPool returns a ServerOption that configures the server
// to use the provided shared buffer pool for parsing incoming messages. Depending
// on the application's workload, this could result in reduced memory allocation.
Expand Down Expand Up @@ -1009,13 +1026,12 @@ func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport,
}()

streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
wg := &sync.WaitGroup{}
st.HandleStreams(ctx, func(stream *transport.Stream) {
wg.Add(1)
s.handlersWG.Add(1)
streamQuota.acquire()
f := func() {
defer streamQuota.release()
defer wg.Done()
defer s.handlersWG.Done()
s.handleStream(st, stream)
}

Expand All @@ -1029,7 +1045,6 @@ func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport,
}
go f()
})
wg.Wait()
}

var _ http.Handler = (*Server)(nil)
Expand Down Expand Up @@ -1915,6 +1930,10 @@ func (s *Server) stop(graceful bool) {
s.serverWorkerChannelClose()
}

if graceful || s.opts.waitForHandlers {
s.handlersWG.Wait()
}

if s.events != nil {
s.events.Finish()
s.events = nil
Expand Down
79 changes: 77 additions & 2 deletions server_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) {
ss.S.GracefulStop()
}

func (s) TestHandlersReturnBeforeStop(t *testing.T) {
// Tests the WaitForHandlers ServerOption by leaving an RPC running while Stop
// is called, and ensures Stop doesn't return until the handler returns.
func (s) TestServer_WaitForHandlers(t *testing.T) {
started := grpcsync.NewEvent()
blockCalls := grpcsync.NewEvent()

Expand All @@ -199,7 +201,7 @@ func (s) TestHandlersReturnBeforeStop(t *testing.T) {
return nil
},
}
if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}); err != nil {
if err := ss.Start([]grpc.ServerOption{grpc.WaitForHandlers(true)}); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()
Expand Down Expand Up @@ -255,3 +257,76 @@ func (s) TestHandlersReturnBeforeStop(t *testing.T) {
t.Fatalf("Timed out waiting for second RPC to start on server.")
}
}

// Tests that GracefulStop will wait for all method handlers to return by
// blocking a handler and ensuring GracefulStop doesn't return until after it is
// unblocked.
func (s) TestServer_GracefulStopWaits(t *testing.T) {
started := grpcsync.NewEvent()
blockCalls := grpcsync.NewEvent()

// This stub server does not properly respect the stream context, so it will
// not exit when the context is canceled.
ss := stubserver.StubServer{
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
started.Fire()
<-blockCalls.Done()
return nil
},
}
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Start one RPC to the server.
ctx1, cancel1 := context.WithCancel(ctx)
_, err := ss.Client.FullDuplexCall(ctx1)
if err != nil {
t.Fatal("Error staring call:", err)
}

// Wait for the handler to be invoked.
select {
case <-started.Done():
case <-ctx.Done():
t.Fatalf("Timed out waiting for RPC to start on server.")
}

// Cancel it on the client. The server handler will still be running.
cancel1()

// Close the connection. This might be sufficient to allow the server to
// return if it doesn't properly wait for outstanding method handlers to
// return.
ss.CC.Close()

// Try to Stop() the server, which should block indefinitely (until
// blockCalls is fired).
stopped := grpcsync.NewEvent()
go func() {
ss.S.GracefulStop()
stopped.Fire()
}()

// Wait 100ms and ensure stopped does not fire.
select {
case <-stopped.Done():
trace := make([]byte, 4096)
trace = trace[0:runtime.Stack(trace, true)]
blockCalls.Fire()
t.Fatalf("Server returned from Stop() illegally. Stack trace:\n%v", string(trace))
case <-time.After(100 * time.Millisecond):
// Success; unblock the call and wait for stopped.
blockCalls.Fire()
}

select {
case <-stopped.Done():
case <-ctx.Done():
t.Fatalf("Timed out waiting for second RPC to start on server.")
}
}
2 changes: 1 addition & 1 deletion test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ func (s) TestDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T) {
// connection for the RPC to go out on initially, and that the TCP connection will shut down strictly after
// the RPC has been started on it.
<-rpcStartedOnServer
go ss.S.Stop()
ss.S.Stop()
// The precise behavior of this test is subject to raceyness around the timing
// of when TCP packets are sent from client to server, and when we tell the
// server to stop, so we need to account for both possible error messages.
Expand Down
12 changes: 3 additions & 9 deletions test/gracefulstop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -270,7 +269,7 @@ func (s) TestGracefulStopBlocksUntilGRPCConnectionsTerminate(t *testing.T) {
// TestStopAbortsBlockingGRPCCall ensures that when Stop() is called while an ongoing RPC
// is blocking that:
// - Stop() returns
// - and the RPC fails with an connection closed error on the client-side
// - and the RPC fails with an connection closed error on the client-side
func (s) TestStopAbortsBlockingGRPCCall(t *testing.T) {
unblockGRPCCall := make(chan struct{})
grpcCallExecuting := make(chan struct{})
Expand Down Expand Up @@ -299,13 +298,8 @@ func (s) TestStopAbortsBlockingGRPCCall(t *testing.T) {
}()

<-grpcCallExecuting
stopReturned := grpcsync.NewEvent()
go func() {
ss.S.Stop()
stopReturned.Fire()
}()
ss.S.Stop()

<-grpcClientCallReturned
unblockGRPCCall <- struct{}{}
<-stopReturned.Done()
<-grpcClientCallReturned
}

0 comments on commit 8fc625b

Please sign in to comment.