Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Implement circuit breaking support. #4050

Merged
merged 32 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
f002b7e
1
GarrettGutierrez1 Nov 5, 2020
81840c4
implementation
GarrettGutierrez1 Nov 18, 2020
027f3b9
Refactored to counter struct
GarrettGutierrez1 Nov 18, 2020
45960e9
Added ClusterUpdate test
GarrettGutierrez1 Nov 18, 2020
7ce9958
testing counter
GarrettGutierrez1 Nov 18, 2020
6efeb41
Testing edsconfig
GarrettGutierrez1 Nov 19, 2020
4021d5d
added marshall test
GarrettGutierrez1 Nov 19, 2020
a60e3b6
Added picker test
GarrettGutierrez1 Nov 19, 2020
a53d4b4
fixed conflict
GarrettGutierrez1 Nov 19, 2020
ceb64d4
working this time
GarrettGutierrez1 Nov 19, 2020
8bc9b02
added comment
GarrettGutierrez1 Nov 19, 2020
63ca25b
removed rand
GarrettGutierrez1 Nov 19, 2020
23e96d4
using waitgroups in counter test
GarrettGutierrez1 Nov 19, 2020
f3beb84
Using *uint32 as max requests in various confis
GarrettGutierrez1 Nov 30, 2020
eff8dfa
Removed error return value
GarrettGutierrez1 Nov 30, 2020
4cd2b4f
fixed error
GarrettGutierrez1 Nov 30, 2020
3308919
Reworked counter
GarrettGutierrez1 Nov 30, 2020
13c8c41
vet fix
GarrettGutierrez1 Dec 1, 2020
b74f32e
Merge branch 'master' into xds-circuit-breaking
GarrettGutierrez1 Dec 1, 2020
96282cb
Fixed error from merge master
GarrettGutierrez1 Dec 1, 2020
dbe85ec
Removed edsconfig change, counter updated in CDS balancer
GarrettGutierrez1 Dec 2, 2020
bb4e674
Fixed drop picker bug
GarrettGutierrez1 Dec 2, 2020
d00cc36
Using atomics in counter
GarrettGutierrez1 Dec 2, 2020
62648b9
added atomic to EndRequest
GarrettGutierrez1 Dec 2, 2020
abd41f8
Added testing
GarrettGutierrez1 Dec 4, 2020
249fc60
Merge branch 'master' into xds-circuit-breaking
GarrettGutierrez1 Dec 4, 2020
fad40e7
merge fix
GarrettGutierrez1 Dec 4, 2020
67e9563
merge fix 2
GarrettGutierrez1 Dec 4, 2020
3406b64
test fix
GarrettGutierrez1 Dec 4, 2020
1c16e68
Reworking testing
GarrettGutierrez1 Dec 4, 2020
3d4130d
Added counter update test
GarrettGutierrez1 Dec 8, 2020
8cbb4f6
Merge branch 'master' into xds-circuit-breaking
GarrettGutierrez1 Dec 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
"google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"

xdsclient "google.golang.org/grpc/xds/internal/client"
Expand Down Expand Up @@ -328,6 +329,8 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) {
return
}

client.SetMaxRequests(update.cds.ServiceName, update.cds.MaxRequests)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a CDS test to check that counter is updated.

(Since you cannot read counter's fields, update max to small values, and add, and check return values)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this change.


// The first good update from the watch API leads to the instantiation of an
// edsBalancer. Further updates/errors are propagated to the existing
// edsBalancer.
Expand Down
37 changes: 37 additions & 0 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
"google.golang.org/grpc/xds/internal/client"
xdsclient "google.golang.org/grpc/xds/internal/client"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
Expand Down Expand Up @@ -574,6 +575,42 @@ func (s) TestUpdateSubConnState(t *testing.T) {
}
}

// TestCircuitBreaking verifies that the CDS balancer correctly updates a
// service's counter on watch updates.
func (s) TestCircuitBreaking(t *testing.T) {
// This creates a CDS balancer, pushes a ClientConnState update with a fake
// xdsClient, and makes sure that the CDS balancer registers a watch on the
// provided xdsClient.
xdsC, cdsB, edsB, _, cancel := setupWithXDSCreds(t)
defer func() {
cancel()
cdsB.Close()
}()

// Here we invoke the watch callback registered on the fake xdsClient. This
// will trigger the watch handler on the CDS balancer, which will update
// the service's counter with the new max requests.
var maxRequests uint32 = 1
cdsUpdate := xdsclient.ClusterUpdate{ServiceName: serviceName, MaxRequests: &maxRequests}
wantCCS := edsCCS(serviceName, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
}

// Since the counter's max requests was set to 1, the first request should
// succeed and the second should fail.
counter := client.GetServiceRequestsCounter(serviceName)
if err := counter.StartRequest(); err != nil {
t.Fatal(err)
}
if err := counter.StartRequest(); err == nil {
t.Fatal("unexpected success on start request over max")
}
counter.EndRequest()
}

// TestClose verifies the Close() method in the the CDS balancer.
func (s) TestClose(t *testing.T) {
// This creates a CDS balancer, pushes a ClientConnState update with a fake
Expand Down
4 changes: 4 additions & 0 deletions xds/internal/balancer/edsbalancer/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ type edsBalancerImplInterface interface {
handleSubConnStateChange(sc balancer.SubConn, state connectivity.State)
// updateState handle a balancer state update from the priority.
updateState(priority priorityType, s balancer.State)
// updateServiceRequestsCounter handles an update to the service name.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updates the service requests counter to the one for the given service name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this change.

updateServiceRequestsCounter(serviceName string)
// close closes the eds balancer.
close()
}
Expand Down Expand Up @@ -212,6 +214,8 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
x.logger.Warningf("failed to update xDS client: %v", err)
}

x.edsImpl.updateServiceRequestsCounter(cfg.EDSServiceName)

// We will update the edsImpl with the new child policy, if we got a
// different one.
if !cmp.Equal(cfg.ChildPolicy, x.config.ChildPolicy, cmpopts.EquateEmpty()) {
Expand Down
47 changes: 40 additions & 7 deletions xds/internal/balancer/edsbalancer/eds_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import (
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
"google.golang.org/grpc/xds/internal/client"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/env"
)

// TODO: make this a environment variable?
Expand Down Expand Up @@ -92,10 +94,11 @@ type edsBalancerImpl struct {
subConnMu sync.Mutex
subConnToPriority map[balancer.SubConn]priorityType

pickerMu sync.Mutex
dropConfig []xdsclient.OverloadDropConfig
drops []*dropper
innerState balancer.State // The state of the picker without drop support.
pickerMu sync.Mutex
dropConfig []xdsclient.OverloadDropConfig
drops []*dropper
innerState balancer.State // The state of the picker without drop support.
serviceRequestsCounter *client.ServiceRequestsCounter
}

// newEDSBalancerImpl create a new edsBalancerImpl.
Expand Down Expand Up @@ -170,7 +173,7 @@ func (edsImpl *edsBalancerImpl) updateDrops(dropConfig []xdsclient.OverloadDropC
// Update picker with old inner picker, new drops.
edsImpl.cc.UpdateState(balancer.State{
ConnectivityState: edsImpl.innerState.ConnectivityState,
Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.loadReporter)},
Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.loadReporter, edsImpl.serviceRequestsCounter)},
)
}
edsImpl.pickerMu.Unlock()
Expand Down Expand Up @@ -389,6 +392,16 @@ func (edsImpl *edsBalancerImpl) handleSubConnStateChange(sc balancer.SubConn, s
}
}

// updateConfig handles changes to the circuit breaking configuration.
func (edsImpl *edsBalancerImpl) updateServiceRequestsCounter(serviceName string) {
if !env.CircuitBreakingSupport {
return
}
if edsImpl.serviceRequestsCounter == nil || edsImpl.serviceRequestsCounter.ServiceName != serviceName {
edsImpl.serviceRequestsCounter = client.GetServiceRequestsCounter(serviceName)
}
}

// updateState first handles priority, and then wraps picker in a drop picker
// before forwarding the update.
func (edsImpl *edsBalancerImpl) updateState(priority priorityType, s balancer.State) {
Expand All @@ -403,7 +416,7 @@ func (edsImpl *edsBalancerImpl) updateState(priority priorityType, s balancer.St
defer edsImpl.pickerMu.Unlock()
edsImpl.innerState = s
// Don't reset drops when it's a state change.
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.loadReporter)})
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.loadReporter, edsImpl.serviceRequestsCounter)})
}
}

Expand Down Expand Up @@ -455,13 +468,15 @@ type dropPicker struct {
drops []*dropper
p balancer.Picker
loadStore load.PerClusterReporter
counter *client.ServiceRequestsCounter
}

func newDropPicker(p balancer.Picker, drops []*dropper, loadStore load.PerClusterReporter) *dropPicker {
func newDropPicker(p balancer.Picker, drops []*dropper, loadStore load.PerClusterReporter, counter *client.ServiceRequestsCounter) *dropPicker {
return &dropPicker{
drops: drops,
p: p,
loadStore: loadStore,
counter: counter,
}
}

Expand All @@ -483,6 +498,24 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
}
return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped")
}
if d.counter != nil {
if err := d.counter.StartRequest(); err != nil {
menghanl marked this conversation as resolved.
Show resolved Hide resolved
return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error())
}
pr, err := d.p.Pick(info)
if err != nil {
d.counter.EndRequest()
return pr, err
}
oldDone := pr.Done
pr.Done = func(doneInfo balancer.DoneInfo) {
d.counter.EndRequest()
if oldDone != nil {
oldDone(doneInfo)
}
}
return pr, err
}
// TODO: (eds) don't drop unless the inner picker is READY. Similar to
// https://github.com/grpc/grpc-go/issues/2622.
return d.p.Pick(info)
Expand Down
65 changes: 64 additions & 1 deletion xds/internal/balancer/edsbalancer/eds_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/client"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/env"
"google.golang.org/grpc/xds/internal/testutils"
)

Expand Down Expand Up @@ -550,6 +552,67 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
}
}

func (s) TestEDS_CircuitBreaking(t *testing.T) {
origCircuitBreakingSupport := env.CircuitBreakingSupport
env.CircuitBreakingSupport = true
defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }()

cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
edsb.updateServiceRequestsCounter("test")
var maxRequests uint32 = 50
client.SetMaxRequests("test", &maxRequests)

// One locality with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)

// Picks with drops.
dones := []func(){}
p := <-cc.NewPickerCh
for i := 0; i < 100; i++ {
pr, err := p.Pick(balancer.PickInfo{})
if i < 50 && err != nil {
t.Errorf("The first 50%% picks should be non-drops, got error %v", err)
} else if i > 50 && err == nil {
t.Errorf("The second 50%% picks should be drops, got error <nil>")
}
dones = append(dones, func() {
if pr.Done != nil {
pr.Done(balancer.DoneInfo{})
}
})
}

for _, done := range dones {
done()
}
dones = []func(){}

// Pick without drops.
for i := 0; i < 50; i++ {
pr, err := p.Pick(balancer.PickInfo{})
if err != nil {
t.Errorf("The third 50%% picks should be non-drops, got error %v", err)
}
dones = append(dones, func() {
if pr.Done != nil {
pr.Done(balancer.DoneInfo{})
}
})
}

// Without this, future tests with the same service name will fail.
for _, done := range dones {
done()
}
}

func init() {
balancer.Register(&testInlineUpdateBalancerBuilder{})
}
Expand Down Expand Up @@ -656,7 +719,7 @@ func (s) TestDropPicker(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

p := newDropPicker(constPicker, tt.drops, nil)
p := newDropPicker(constPicker, tt.drops, nil, nil)

// scCount is the number of sc's returned by pick. The opposite of
// drop-count.
Expand Down
6 changes: 6 additions & 0 deletions xds/internal/balancer/edsbalancer/eds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type fakeEDSBalancer struct {
childPolicy *testutils.Channel
subconnStateChange *testutils.Channel
edsUpdate *testutils.Channel
serviceName *testutils.Channel
}

func (f *fakeEDSBalancer) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
Expand All @@ -131,6 +132,10 @@ func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) {

func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {}

func (f *fakeEDSBalancer) updateServiceRequestsCounter(serviceName string) {
f.serviceName.Send(serviceName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where should that be verified? I'm guessing added to one of the waitFor functions below?

Yes. And call it to verify in the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this change. Wasn't 100% sure what test you are talking about, added the check to TestConfigChildPolicyUpdate because it is the only test that invokes the policy update and has a fakeEDSBalancer.

}

func (f *fakeEDSBalancer) close() {}

func (f *fakeEDSBalancer) waitForChildPolicy(ctx context.Context, wantPolicy *loadBalancingConfig) error {
Expand Down Expand Up @@ -175,6 +180,7 @@ func newFakeEDSBalancer(cc balancer.ClientConn) edsBalancerImplInterface {
childPolicy: testutils.NewChannelWithSize(10),
subconnStateChange: testutils.NewChannelWithSize(10),
edsUpdate: testutils.NewChannelWithSize(10),
serviceName: testutils.NewChannelWithSize(10),
}
}

Expand Down
2 changes: 2 additions & 0 deletions xds/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ type ClusterUpdate struct {
EnableLRS bool
// SecurityCfg contains security configuration sent by the control plane.
SecurityCfg *SecurityConfig
// MaxRequests for circuit breaking, if any (otherwise nil).
MaxRequests *uint32
}

// OverloadDropConfig contains the config to drop overloads.
Expand Down
39 changes: 39 additions & 0 deletions xds/internal/client/client_cds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/xds/internal/env"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/protobuf/types/known/wrapperspb"
)

const (
Expand Down Expand Up @@ -169,8 +171,45 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
wantUpdate: ClusterUpdate{ServiceName: serviceName, EnableLRS: true},
},
{
name: "happiest-case-with-circuitbreakers",
cluster: &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
EdsConfig: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{
Ads: &v3corepb.AggregatedConfigSource{},
},
},
ServiceName: serviceName,
},
LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
CircuitBreakers: &v3clusterpb.CircuitBreakers{
Thresholds: []*v3clusterpb.CircuitBreakers_Thresholds{
{
Priority: v3corepb.RoutingPriority_DEFAULT,
MaxRequests: wrapperspb.UInt32(512),
},
{
Priority: v3corepb.RoutingPriority_HIGH,
MaxRequests: nil,
},
},
},
LrsServer: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{
Self: &v3corepb.SelfConfigSource{},
},
},
},
wantUpdate: ClusterUpdate{ServiceName: serviceName, EnableLRS: true, MaxRequests: func() *uint32 { i := uint32(512); return &i }()},
},
}

origCircuitBreakingSupport := env.CircuitBreakingSupport
env.CircuitBreakingSupport = true
defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
update, err := validateCluster(test.cluster)
Expand Down