Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

leastrequest: fix data race in leastrequest picker #6587

Merged
merged 3 commits into from
Aug 31, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions balancer/leastrequest/leastrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (bb) Name() string {
}

func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*int32)}
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*atomic.Int32)}
baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true})
b.Balancer = baseBuilder.Build(cc, bOpts)
return b
Expand All @@ -92,7 +92,7 @@ type leastRequestBalancer struct {
balancer.Balancer

choiceCount uint32
scRPCCounts map[balancer.SubConn]*int32 // Hold onto RPC counts to keep track for subsequent picker updates.
scRPCCounts map[balancer.SubConn]*atomic.Int32 // Hold onto RPC counts to keep track for subsequent picker updates.
}

func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
Expand All @@ -108,7 +108,7 @@ func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnStat

type scWithRPCCount struct {
sc balancer.SubConn
numRPCs *int32
numRPCs *atomic.Int32
}

func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picker {
Expand All @@ -126,7 +126,7 @@ func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picke
// Create new refs if needed.
for sc := range info.ReadySCs {
if _, ok := lrb.scRPCCounts[sc]; !ok {
lrb.scRPCCounts[sc] = new(int32)
lrb.scRPCCounts[sc] = new(atomic.Int32)
}
}

Expand Down Expand Up @@ -154,6 +154,8 @@ type picker struct {
}

func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
// There is some degree of raciness in Pick, this is allowed: "it may be argued that
// it's better to accept some degree of raciness in the picker instead." - A48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this comment, I don't think you need to state it. If you do, post the full quote and wrap to 60 characters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

60 characters

It's irrelevant now, but note that we wrap at 80 columns, not 60.

var pickedSC *scWithRPCCount
for i := 0; i < int(p.choiceCount); i++ {
index := grpcranduint32() % uint32(len(p.subConns))
Expand All @@ -162,18 +164,18 @@ func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
pickedSC = &sc
continue
}
if *sc.numRPCs < *pickedSC.numRPCs {
if sc.numRPCs.Load() < pickedSC.numRPCs.Load() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q for @zasweq: should we cache pickedSC.numRPCs, or reload it on every iteration as we are currently doing?

for ... {
	index := ...
	sc := ...
	scRPCs := sc.numRPCs.Load()
	if pickedSC == nil || scRPCs < pickedSCRPCs{
		pickedSC = &sc
		pickedSCRPCs = scRPCs
	}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you're saying. I think both caching it and the way it is fall within the language described here: https://github.com/grpc/proposal/blob/master/A48-xds-least-request-lb-policy.md#outstanding-request-counter-raciness. Caching it before the iteration would get rid of the race where other threads and picks can add or subtract between iterations. "While reading the outstanding request counters of samples in the picker, previously read values may become outdated" - specifically, this language I think encompasses a. scs that have already been checked and b. the picked sc. As Eric mentioned offline, I think eventually this algorithm goes towards consistency even if the checks aren't perfect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, just talked to Eric about this and he agrees with your comment. a. loads are expensive, particularly if you get a subsequent write in another Pick call, since it requires exclusive access similar to a rw lock. Thus, it is more performant to perform the operation once. b. Wrt atomics, Eric has the philosophy of only Loading once, as it makes the code easier to reason about and also prevents concurrent operations on that data breaking assumptions and logic about the code. After this is merged I'll change it and write a test for concurrent RPC's.

pickedSC = &sc
}
}
// "The counter for a subchannel should be atomically incremented by one
// after it has been successfully picked by the picker." - A48
atomic.AddInt32(pickedSC.numRPCs, 1)
pickedSC.numRPCs.Add(1)
// "the picker should add a callback for atomically decrementing the
// subchannel counter once the RPC finishes (regardless of Status code)." -
// A48.
done := func(balancer.DoneInfo) {
atomic.AddInt32(pickedSC.numRPCs, -1)
pickedSC.numRPCs.Add(-1)
}
return balancer.PickResult{
SubConn: pickedSC.sc,
Expand Down