From 0aa86999bacd348733efd9e102e3d5218896f7de Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 4 May 2023 15:42:29 -0700 Subject: [PATCH] review comments 3 --- balancer/weightedroundrobin/balancer.go | 26 +-- balancer/weightedroundrobin/balancer_test.go | 233 ++++++++++++------- balancer/weightedroundrobin/scheduler.go | 13 +- orca/producer.go | 13 +- 4 files changed, 173 insertions(+), 112 deletions(-) diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index 6fdd6577085e..4505416fd80c 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -131,7 +131,6 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error return balancer.ErrBadResolverState } - // Regenerate & send picker. b.regeneratePicker() return nil @@ -156,13 +155,16 @@ func (b *wrrBalancer) updateAddresses(addrs []resolver.Address) { // addr is a new address (not existing in b.subConns). sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{}) if err != nil { - b.logger.Warningf("wrr: failed to create new SubConn: %v", err) + b.logger.Warningf("wrr: failed to create new SubConn for address %v: %v", addr, err) continue } wsc = &weightedSubConn{ SubConn: sc, logger: b.logger, connectivityState: connectivity.Idle, + // Initially, we set load reports to off, because they are not + // running upon initial weightedSubConn creation. + cfg: &lbConfig{EnableOOBLoadReport: false}, } b.subConns.Set(addr, wsc) b.scMap[sc] = wsc @@ -170,7 +172,8 @@ func (b *wrrBalancer) updateAddresses(addrs []resolver.Address) { sc.Connect() } // Update config for existing weightedSubConn or send update for first - // time to new one. Ensures an OOB listener is running if needed. + // time to new one. Ensures an OOB listener is running if needed + // (and stops the existing one if applicable). wsc.updateConfig(b.cfg) } @@ -384,8 +387,8 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { // weightedSubConn is the wrapper of a subconn that holds the subconn and its // weight (and other parameters relevant to computing the effective weight). -// It also tracks connectivity state, listens for metrics updates by -// implementing the orca.OOBListener interface and manages that listener. +// When needed, it also tracks connectivity state, listens for metrics updates +// by implementing the orca.OOBListener interface and manages that listener. type weightedSubConn struct { balancer.SubConn logger *grpclog.PrefixLogger @@ -436,11 +439,6 @@ func (w *weightedSubConn) updateConfig(cfg *lbConfig) { w.mu.Lock() defer w.mu.Unlock() oldCfg := w.cfg - if oldCfg == nil { - // By default we set load reports to off, because they are not running - // upon initial weightedSubConn creation. - oldCfg = &lbConfig{EnableOOBLoadReport: false} - } w.cfg = cfg newPeriod := cfg.OOBReportingPeriod if cfg.EnableOOBLoadReport == oldCfg.EnableOOBLoadReport && @@ -460,7 +458,7 @@ func (w *weightedSubConn) updateConfig(cfg *lbConfig) { return } if w.logger.V(2) { - w.logger.Infof("Registering listener for %v with interval %v", w.SubConn, newPeriod) + w.logger.Infof("Registering ORCA listener for %v with interval %v", w.SubConn, newPeriod) } opts := orca.OOBListenerOptions{ReportInterval: newPeriod} w.stopORCAListener = orca.RegisterOOBListener(w.SubConn, w, opts) @@ -476,7 +474,7 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect w.SubConn.Connect() case connectivity.Ready: // If we transition back to READY state, reset nonEmptySince so that we - // apply the backout period after we start receiving load data. Note + // apply the blackout period after we start receiving load data. Note // that we cannot guarantee that we will never receive lingering // callbacks for backend metric reports from the previous connection // after the new connection has been established, but they should be @@ -506,8 +504,8 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect // weight returns the current effective weight of the subconn, taking into // account the parameters. Returns 0 for blacked out or expired data, which -// will cause the backend weight to be treated as the mean of the other -// backends. +// will cause the backend weight to be treated as the mean of the weights of +// the other backends. func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackoutPeriod time.Duration) float64 { w.mu.Lock() defer w.mu.Unlock() diff --git a/balancer/weightedroundrobin/balancer_test.go b/balancer/weightedroundrobin/balancer_test.go index 4b16c151f4bb..f4edb32f6714 100644 --- a/balancer/weightedroundrobin/balancer_test.go +++ b/balancer/weightedroundrobin/balancer_test.go @@ -51,7 +51,35 @@ func Test(t *testing.T) { } const defaultTestTimeout = 10 * time.Second -const rrIterations = 100 +const weightUpdatePeriod = 50 * time.Millisecond +const oobReportingInterval = 10 * time.Millisecond + +func init() { + iwrr.AllowAnyWeightUpdatePeriod = true +} + +func boolp(b bool) *bool { return &b } +func float64p(f float64) *float64 { return &f } +func durationp(d time.Duration) *time.Duration { return &d } + +var ( + perCallConfig = iwrr.LBConfig{ + EnableOOBLoadReport: boolp(false), + OOBReportingPeriod: durationp(5 * time.Millisecond), + BlackoutPeriod: durationp(0), + WeightExpirationPeriod: durationp(time.Minute), + WeightUpdatePeriod: durationp(weightUpdatePeriod), + ErrorUtilizationPenalty: float64p(0), + } + oobConfig = iwrr.LBConfig{ + EnableOOBLoadReport: boolp(true), + OOBReportingPeriod: durationp(5 * time.Millisecond), + BlackoutPeriod: durationp(0), + WeightExpirationPeriod: durationp(time.Minute), + WeightUpdatePeriod: durationp(weightUpdatePeriod), + ErrorUtilizationPenalty: float64p(0), + } +) type testServer struct { *stubserver.StubServer @@ -89,7 +117,7 @@ func startServer(t *testing.T, r reportType) *testServer { } var sopts []grpc.ServerOption - if r == reportBoth || r == reportCall { + if r == reportCall || r == reportBoth { sopts = append(sopts, orca.CallMetricsServerOption(nil)) } @@ -129,98 +157,44 @@ func svcConfig(t *testing.T, wrrCfg iwrr.LBConfig) string { return sc } -const weightUpdatePeriod = 50 * time.Millisecond -const oobReportingInterval = 10 * time.Millisecond - -func init() { - iwrr.AllowAnyWeightUpdatePeriod = true -} - -func boolp(b bool) *bool { return &b } -func float64p(f float64) *float64 { return &f } -func durationp(d time.Duration) *time.Duration { return &d } - -var ( - perCallConfig = iwrr.LBConfig{ - EnableOOBLoadReport: boolp(false), - OOBReportingPeriod: durationp(5 * time.Millisecond), - BlackoutPeriod: durationp(0), - WeightExpirationPeriod: durationp(time.Minute), - WeightUpdatePeriod: durationp(weightUpdatePeriod), - ErrorUtilizationPenalty: float64p(0), - } - oobConfig = iwrr.LBConfig{ - EnableOOBLoadReport: boolp(true), - OOBReportingPeriod: durationp(5 * time.Millisecond), - BlackoutPeriod: durationp(0), - WeightExpirationPeriod: durationp(time.Minute), - WeightUpdatePeriod: durationp(weightUpdatePeriod), - ErrorUtilizationPenalty: float64p(0), - } -) - -func (s) TestBalancer_OneAddress_ReportingDisabled(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - srv := startServer(t, reportNone) - - sc := svcConfig(t, perCallConfig) - if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { - t.Fatalf("Error starting client: %v", err) - } - - // Perform many RPCs to ensure the LB policy works with 1 address. - for i := 0; i < 100; i++ { - if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { - t.Fatalf("Error from EmptyCall: %v", err) - } - time.Sleep(time.Millisecond) // Delay; test will run 100ms and should perform ~10 weight updates - } -} - -func (s) TestBalancer_OneAddress_ReportingEnabledPerCall(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - srv := startServer(t, reportCall) - - sc := svcConfig(t, perCallConfig) - if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { - t.Fatalf("Error starting client: %v", err) - } - - // Perform many RPCs to ensure the LB policy works with 1 address. - for i := 0; i < 100; i++ { - srv.callMetrics.SetQPS(float64(i)) - if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { - t.Fatalf("Error from EmptyCall: %v", err) - } - time.Sleep(time.Millisecond) // Delay; test will run 100ms and should perform ~10 weight updates +// Tests basic functionality with one address. With only one address, load +// reporting doesn't affect routing at all. +func (s) TestBalancer_OneAddress(t *testing.T) { + testCases := []struct { + rt reportType + }{ + {rt: reportNone}, + {rt: reportCall}, + {rt: reportOOB}, } -} -func (s) TestBalancer_OneAddress_ReportingEnabledOOB(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() + for _, tc := range testCases { + t.Run(fmt.Sprintf("reportType:%v", tc.rt), func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() - srv := startServer(t, reportOOB) + srv := startServer(t, tc.rt) - sc := svcConfig(t, oobConfig) - if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { - t.Fatalf("Error starting client: %v", err) - } + sc := svcConfig(t, perCallConfig) + if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { + t.Fatalf("Error starting client: %v", err) + } - // Perform many RPCs to ensure the LB policy works with 1 address. - for i := 0; i < 100; i++ { - srv.oobMetrics.SetQPS(float64(i)) - if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { - t.Fatalf("Error from EmptyCall: %v", err) - } - time.Sleep(time.Millisecond) // Delay; test will run 100ms and should perform ~10 weight updates + // Perform many RPCs to ensure the LB policy works with 1 address. + for i := 0; i < 100; i++ { + srv.callMetrics.SetQPS(float64(i)) + srv.oobMetrics.SetQPS(float64(i)) + if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("Error from EmptyCall: %v", err) + } + time.Sleep(time.Millisecond) // Delay; test will run 100ms and should perform ~10 weight updates + } + }) } } +// Tests two addresses with ORCA reporting disabled (should fall back to pure +// RR). func (s) TestBalancer_TwoAddresses_ReportingDisabled(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -241,6 +215,8 @@ func (s) TestBalancer_TwoAddresses_ReportingDisabled(t *testing.T) { } } +// Tests two addresses with per-call ORCA reporting enabled. Checks the +// backends are called in the appropriate ratios. func (s) TestBalancer_TwoAddresses_ReportingEnabledPerCall(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -271,6 +247,8 @@ func (s) TestBalancer_TwoAddresses_ReportingEnabledPerCall(t *testing.T) { checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}) } +// Tests two addresses with OOB ORCA reporting enabled. Checks the backends +// are called in the appropriate ratios. func (s) TestBalancer_TwoAddresses_ReportingEnabledOOB(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -301,6 +279,9 @@ func (s) TestBalancer_TwoAddresses_ReportingEnabledOOB(t *testing.T) { checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}) } +// Tests two addresses with OOB ORCA reporting enabled, where the reports +// change over time. Checks the backends are called in the appropriate ratios +// before and after modifying the reports. func (s) TestBalancer_TwoAddresses_UpdateLoads(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -343,6 +324,9 @@ func (s) TestBalancer_TwoAddresses_UpdateLoads(t *testing.T) { checkWeights(ctx, t, srvWeight{srv1, 10}, srvWeight{srv2, 1}) } +// Tests two addresses with OOB ORCA reporting enabled, then with switching to +// per-call reporting. Checks the backends are called in the appropriate +// ratios before and after the change. func (s) TestBalancer_TwoAddresses_OOBThenPerCall(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -394,6 +378,8 @@ func (s) TestBalancer_TwoAddresses_OOBThenPerCall(t *testing.T) { checkWeights(ctx, t, srvWeight{srv1, 10}, srvWeight{srv2, 1}) } +// Tests two addresses with OOB ORCA reporting enabled and a non-zero error +// penalty applied. func (s) TestBalancer_TwoAddresses_ErrorPenalty(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -445,6 +431,8 @@ func (s) TestBalancer_TwoAddresses_ErrorPenalty(t *testing.T) { checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1}) } +// Tests that the blackout period causes backends to use 0 as their weight +// until the backout period elapses. func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -520,6 +508,8 @@ func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) { } } +// Tests that the weight expiration period causes backends to use 0 as their +// weight once the expiration period elapses. func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -585,6 +575,72 @@ func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) { checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1}) } +// Tests logic surrounding subchannel management. +func (s) TestBalancer_AddressesChanging(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + srv1 := startServer(t, reportBoth) + srv2 := startServer(t, reportBoth) + srv3 := startServer(t, reportBoth) + srv4 := startServer(t, reportBoth) + + // srv1: weight 10 + srv1.oobMetrics.SetQPS(10.0) + srv1.oobMetrics.SetCPUUtilization(1.0) + // srv2: weight 100 + srv2.oobMetrics.SetQPS(10.0) + srv2.oobMetrics.SetCPUUtilization(.1) + // srv3: weight 20 + srv3.oobMetrics.SetQPS(20.0) + srv3.oobMetrics.SetCPUUtilization(1.0) + // srv4: weight 200 + srv4.oobMetrics.SetQPS(20.0) + srv4.oobMetrics.SetCPUUtilization(.1) + + sc := svcConfig(t, oobConfig) + if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { + t.Fatalf("Error starting client: %v", err) + } + srv2.Client = srv1.Client + addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}, {Addr: srv3.Address}} + srv1.R.UpdateState(resolver.State{Addresses: addrs}) + + // Call each backend once to ensure the weights have been received. + ensureReached(ctx, t, srv1.Client, 3) + time.Sleep(weightUpdatePeriod) + checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv3, 2}) + + // Add backend 4 + addrs = append(addrs, resolver.Address{Addr: srv4.Address}) + srv1.R.UpdateState(resolver.State{Addresses: addrs}) + time.Sleep(weightUpdatePeriod) + checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv3, 2}, srvWeight{srv4, 20}) + + // Shutdown backend 3. RPCs will no longer be routed to it. + srv3.Stop() + time.Sleep(weightUpdatePeriod) + checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv4, 20}) + + // Remove addresses 2 and 3. RPCs will no longer be routed to 2 either. + addrs = []resolver.Address{{Addr: srv1.Address}, {Addr: srv4.Address}} + srv1.R.UpdateState(resolver.State{Addresses: addrs}) + time.Sleep(weightUpdatePeriod) + checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv4, 20}) + + // Re-add 2 and remove the rest. + addrs = []resolver.Address{{Addr: srv2.Address}} + srv1.R.UpdateState(resolver.State{Addresses: addrs}) + time.Sleep(weightUpdatePeriod) + checkWeights(ctx, t, srvWeight{srv2, 10}) + + // Re-add 4. + addrs = append(addrs, resolver.Address{Addr: srv4.Address}) + srv1.R.UpdateState(resolver.State{Addresses: addrs}) + time.Sleep(weightUpdatePeriod) + checkWeights(ctx, t, srvWeight{srv2, 10}, srvWeight{srv4, 20}) +} + func ensureReached(ctx context.Context, t *testing.T, c testgrpc.TestServiceClient, n int) { t.Helper() reached := make(map[string]struct{}) @@ -602,6 +658,11 @@ type srvWeight struct { w int } +const rrIterations = 100 + +// checkWeights does rrIterations RPCs and expects the different backends to be +// routed in a ratio as deterimined by the srvWeights passed in. Allows for +// some variance (+/- 2 RPCs per backend). func checkWeights(ctx context.Context, t *testing.T, sws ...srvWeight) { t.Helper() diff --git a/balancer/weightedroundrobin/scheduler.go b/balancer/weightedroundrobin/scheduler.go index 257e6b6b567d..eaf89393e1b7 100644 --- a/balancer/weightedroundrobin/scheduler.go +++ b/balancer/weightedroundrobin/scheduler.go @@ -27,10 +27,10 @@ type scheduler interface { } // newScheduler uses scWeights to create a new scheduler for selecting subconns -// in a picker. It will return a round robin implementation if all weights are -// zero or there is only a single subconn, otherwise it will return an Earliest -// Deadline First (EDF) scheduler implementation that selects the subchannels -// according to their weights. +// in a picker. It will return a round robin implementation if at least +// len(scWeights)-1 are zero or there is only a single subconn, otherwise it +// will return an Earliest Deadline First (EDF) scheduler implementation that +// selects the subchannels according to their weights. func newScheduler(scWeights []float64, inc func() uint32) scheduler { n := len(scWeights) if n == 0 { @@ -124,8 +124,9 @@ func (s *edfScheduler) nextIndex() int { } } -// A simple RR scheduler to use for fallback when all weights are zero or the -// same or when only one subconn exists. +// A simple RR scheduler to use for fallback when fewer than two backends have +// non-zero weights, or all backends have the the same weight, or when only one +// subconn exists. type rrScheduler struct { inc func() uint32 numSCs int diff --git a/orca/producer.go b/orca/producer.go index 956d5ddfb52d..8428a3951939 100644 --- a/orca/producer.go +++ b/orca/producer.go @@ -174,12 +174,13 @@ func (p *producer) run(ctx context.Context) { // Unimplemented; do not retry. logger.Error("Server doesn't support ORCA OOB load reporting protocol; not listening for load reports.") return - case status.Code(err) == codes.Unavailable: - // TODO: this code should ideally log an error, too, but for now we - // receive this code when shutting down the ClientConn. Once we - // can determine the state or ensure the producer is stopped before - // the stream ends, we can log an error when it's not a natural - // shutdown. + case status.Code(err) == codes.Unavailable, status.Code(err) == codes.Canceled: + // TODO: these codes should ideally log an error, too, but for now + // we receive them when shutting down the ClientConn (Unavailable + // if the stream hasn't started yet, and Canceled if it happens + // mid-stream). Once we can determine the state or ensure the + // producer is stopped before the stream ends, we can log an error + // when it's not a natural shutdown. default: // Log all other errors. logger.Error("Received unexpected stream error:", err)