Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: rework resolver and balancer wrappers to avoid deadlock #6804

Merged
merged 21 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
274 changes: 98 additions & 176 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,6 @@
"google.golang.org/grpc/resolver"
)

type ccbMode int

const (
ccbModeActive = iota
ccbModeIdle
ccbModeClosed
ccbModeExitingIdle
)

// ccBalancerWrapper sits between the ClientConn and the Balancer.
//
// ccBalancerWrapper implements methods corresponding to the ones on the
Expand All @@ -57,84 +48,89 @@
type ccBalancerWrapper struct {
// The following fields are initialized when the wrapper is created and are
// read-only afterwards, and therefore can be accessed without a mutex.
cc *ClientConn
opts balancer.BuildOptions

// Outgoing (gRPC --> balancer) calls are guaranteed to execute in a
// mutually exclusive manner as they are scheduled in the serializer. Fields
// accessed *only* in these serializer callbacks, can therefore be accessed
// without a mutex.
balancer *gracefulswitch.Balancer
cc *ClientConn
opts balancer.BuildOptions
balancer *gracefulswitch.Balancer
zasweq marked this conversation as resolved.
Show resolved Hide resolved
serializer *grpcsync.CallbackSerializer
zasweq marked this conversation as resolved.
Show resolved Hide resolved
serializerCancel context.CancelFunc

// The following fields are only accessed within the serializer.
zasweq marked this conversation as resolved.
Show resolved Hide resolved
curBalancerName string

// mu guards access to the below fields. Access to the serializer and its
// cancel function needs to be mutex protected because they are overwritten
// when the wrapper exits idle mode.
mu sync.Mutex
serializer *grpcsync.CallbackSerializer // To serialize all outoing calls.
serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time.
mode ccbMode // Tracks the current mode of the wrapper.
// The following fields are protected by mu. Caller must take cc.mu before
zasweq marked this conversation as resolved.
Show resolved Hide resolved
// taking mu.
mu sync.Mutex
closed bool
easwars marked this conversation as resolved.
Show resolved Hide resolved
}

// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
// underlying balancer is not created until the switchTo() method is invoked.
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
ctx, cancel := context.WithCancel(cc.ctx)
ccb := &ccBalancerWrapper{
cc: cc,
opts: bopts,
mode: ccbModeIdle,
cc: cc,
opts: balancer.BuildOptions{
DialCreds: cc.dopts.copts.TransportCredentials,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
},
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
}
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
zasweq marked this conversation as resolved.
Show resolved Hide resolved
return ccb
}

// updateClientConnState is invoked by grpc to push a ClientConnState update to
// the underlying balancer.
// the underlying balancer. This is always executed from the serializer, so
// it is safe to call into the balancer here.
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.mu.Lock()
errCh := make(chan error, 1)
// Here and everywhere else where Schedule() is called, it is done with the
// lock held. But the lock guards only the scheduling part. The actual
// callback is called asynchronously without the lock being held.
ok := ccb.serializer.Schedule(func(_ context.Context) {
errCh <- ccb.balancer.UpdateClientConnState(*ccs)
errCh := make(chan error)
ok := ccb.serializer.Schedule(func(ctx context.Context) {
defer close(errCh)
if ctx.Err() != nil {
return
}
err := ccb.balancer.UpdateClientConnState(*ccs)
if logger.V(2) && err != nil {
logger.Infof("error from balancer.UpdateClientConnState: %v", err)
}

Check warning on line 101 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L100-L101

Added lines #L100 - L101 were not covered by tests
errCh <- err
})
if !ok {
// If we are unable to schedule a function with the serializer, it
// indicates that it has been closed. A serializer is only closed when
// the wrapper is closed or is in idle.
ccb.mu.Unlock()
return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer")
return nil

Check warning on line 105 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L105

Added line #L105 was not covered by tests
}
ccb.mu.Unlock()

// We get here only if the above call to Schedule succeeds, in which case it
// is guaranteed that the scheduled function will run. Therefore it is safe
// to block on this channel.
err := <-errCh
if logger.V(2) && err != nil {
logger.Infof("error from balancer.UpdateClientConnState: %v", err)
}
return err
return <-errCh
zasweq marked this conversation as resolved.
Show resolved Hide resolved
}

// updateSubConnState is invoked by grpc to push a subConn state update to the
// underlying balancer.
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
ccb.mu.Lock()
ccb.serializer.Schedule(func(_ context.Context) {
ccb.serializer.Schedule(func(ctx context.Context) {
if ctx.Err() != nil {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
return
}
// Even though it is optional for balancers, gracefulswitch ensures
// opts.StateListener is set, so this cannot ever be nil.
// TODO: delete this comment when UpdateSubConnState is removed.
sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
})
ccb.mu.Unlock()
}

// resolverError is invoked by grpc to push a resolver error to the underlying
// balancer. This is always executed from the serializer, so it is safe to call
// into the balancer here.
zasweq marked this conversation as resolved.
Show resolved Hide resolved
func (ccb *ccBalancerWrapper) resolverError(err error) {
ccb.mu.Lock()
ccb.serializer.Schedule(func(_ context.Context) {
ccb.serializer.Schedule(func(ctx context.Context) {
if ctx.Err() != nil {
return
}

Check warning on line 131 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L130-L131

Added lines #L130 - L131 were not covered by tests
ccb.balancer.ResolverError(err)
})
ccb.mu.Unlock()
}

// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
Expand All @@ -148,16 +144,17 @@
// the ccBalancerWrapper keeps track of the current LB policy name, and skips
// the graceful balancer switching process if the name does not change.
func (ccb *ccBalancerWrapper) switchTo(name string) {
ccb.mu.Lock()
ccb.serializer.Schedule(func(_ context.Context) {
ccb.serializer.Schedule(func(ctx context.Context) {
if ctx.Err() != nil {
return
}
// TODO: Other languages use case-sensitive balancer registries. We should
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
if strings.EqualFold(ccb.curBalancerName, name) {
return
}
ccb.buildLoadBalancingPolicy(name)
})
ccb.mu.Unlock()
}

// buildLoadBalancingPolicy performs the following:
Expand Down Expand Up @@ -186,113 +183,41 @@

func (ccb *ccBalancerWrapper) close() {
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing")
ccb.closeBalancer(ccbModeClosed)
}

// enterIdleMode is invoked by grpc when the channel enters idle mode upon
// expiry of idle_timeout. This call blocks until the balancer is closed.
func (ccb *ccBalancerWrapper) enterIdleMode() {
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode")
ccb.closeBalancer(ccbModeIdle)
}

// closeBalancer is invoked when the channel is being closed or when it enters
// idle mode upon expiry of idle_timeout.
func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) {
ccb.mu.Lock()
if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle {
ccb.mu.Unlock()
return
}

ccb.mode = m
done := ccb.serializer.Done()
b := ccb.balancer
ok := ccb.serializer.Schedule(func(_ context.Context) {
// Close the serializer to ensure that no more calls from gRPC are sent
// to the balancer.
ccb.serializerCancel()
// Empty the current balancer name because we don't have a balancer
// anymore and also so that we act on the next call to switchTo by
// creating a new balancer specified by the new resolver.
ccb.curBalancerName = ""
})
if !ok {
ccb.mu.Unlock()
return
}
ccb.mu.Unlock()

// Give enqueued callbacks a chance to finish before closing the balancer.
<-done
b.Close()
}

// exitIdleMode is invoked by grpc when the channel exits idle mode either
// because of an RPC or because of an invocation of the Connect() API. This
// recreates the balancer that was closed previously when entering idle mode.
//
// If the channel is not in idle mode, we know for a fact that we are here as a
// result of the user calling the Connect() method on the ClientConn. In this
// case, we can simply forward the call to the underlying balancer, instructing
// it to reconnect to the backends.
func (ccb *ccBalancerWrapper) exitIdleMode() {
ccb.mu.Lock()
if ccb.mode == ccbModeClosed {
// Request to exit idle is a no-op when wrapper is already closed.
ccb.mu.Unlock()
return
}

if ccb.mode == ccbModeIdle {
// Recreate the serializer which was closed when we entered idle.
ctx, cancel := context.WithCancel(context.Background())
ccb.serializer = grpcsync.NewCallbackSerializer(ctx)
ccb.serializerCancel = cancel
}

// The ClientConn guarantees that mutual exclusion between close() and
// exitIdleMode(), and since we just created a new serializer, we can be
// sure that the below function will be scheduled.
done := make(chan struct{})
ccb.serializer.Schedule(func(context.Context) {
defer close(done)

ccb.mu.Lock()
defer ccb.mu.Unlock()

if ccb.mode != ccbModeIdle {
ccb.balancer.ExitIdle()
if ccb.balancer == nil {
return
}

// Gracefulswitch balancer does not support a switchTo operation after
// being closed. Hence we need to create a new one here.
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
ccb.mode = ccbModeActive
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")

ccb.balancer.Close()
ccb.balancer = nil
})
ccb.mu.Unlock()

<-done
ccb.serializerCancel()
easwars marked this conversation as resolved.
Show resolved Hide resolved
}

func (ccb *ccBalancerWrapper) isIdleOrClosed() bool {
ccb.mu.Lock()
defer ccb.mu.Unlock()
return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed
// exitIdle invokes the balancer's exitIdle method in the scheduler.
zasweq marked this conversation as resolved.
Show resolved Hide resolved
func (ccb *ccBalancerWrapper) exitIdle() {
ccb.serializer.Schedule(func(ctx context.Context) {
if ctx.Err() != nil {
return
}

Check warning on line 201 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L200-L201

Added lines #L200 - L201 were not covered by tests
ccb.balancer.ExitIdle()
})
}

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if ccb.isIdleOrClosed() {
return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle")
ccb.cc.mu.Lock()
defer ccb.cc.mu.Unlock()
zasweq marked this conversation as resolved.
Show resolved Hide resolved

ccb.mu.Lock()
if ccb.closed {
ccb.mu.Unlock()
return nil, errConnIdling

Check warning on line 213 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L212-L213

Added lines #L212 - L213 were not covered by tests
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
ccb.mu.Unlock()

if len(addrs) == 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
ac, err := ccb.cc.newAddrConn(addrs, opts)
ac, err := ccb.cc.newAddrConnLocked(addrs, opts)
if err != nil {
channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
Expand All @@ -313,10 +238,6 @@
}

func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
if ccb.isIdleOrClosed() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we not want to check if the balancer is closed here? UpdateAddresses could lead to transport creation, right.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more relevant whether the SubConn/addrConn was closed and not whether the balancer was closed. And we do that check here:

if ac.state == connectivity.Shutdown ||

return
}

acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
Expand All @@ -325,25 +246,39 @@
}

func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
if ccb.isIdleOrClosed() {
ccb.cc.mu.Lock()
defer ccb.cc.mu.Unlock()

ccb.mu.Lock()
if ccb.closed {
ccb.mu.Unlock()

Check warning on line 254 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L254

Added line #L254 was not covered by tests
return
}

ccb.mu.Unlock()
// Update picker before updating state. Even though the ordering here does
// not matter, it can lead to multiple calls of Pick in the common start-up
// case where we wait for ready and then perform an RPC. If the picker is
// updated later, we could call the "connecting" picker when the state is
// updated, and then call the "ready" picker after the picker gets updated.

// Note that there is no need to check if the balancer wrapper was closed,
// as we know the graceful switch LB policy will not call cc if it has been
dfawley marked this conversation as resolved.
Show resolved Hide resolved
// closed.
ccb.cc.pickerWrapper.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
}

func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
if ccb.isIdleOrClosed() {
ccb.cc.mu.RLock()
defer ccb.cc.mu.RUnlock()

ccb.mu.Lock()
if ccb.closed {
ccb.mu.Unlock()

Check warning on line 277 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L277

Added line #L277 was not covered by tests
return
}

ccb.cc.resolveNow(o)
ccb.mu.Unlock()
ccb.cc.resolveNowLocked(o)
}

func (ccb *ccBalancerWrapper) Target() string {
Expand Down Expand Up @@ -374,20 +309,7 @@
}

func (acbw *acBalancerWrapper) Shutdown() {
ccb := acbw.ccb
if ccb.isIdleOrClosed() {
// It it safe to ignore this call when the balancer is closed or in idle
// because the ClientConn takes care of closing the connections.
//
// Not returning early from here when the balancer is closed or in idle
// leads to a deadlock though, because of the following sequence of
// calls when holding cc.mu:
// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
// ccb.RemoveAddrConn --> cc.removeAddrConn
return
}

ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
}

// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
Expand Down