From a1033b1f44942fe30e3308ba85ee647e4187d97e Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Fri, 15 Mar 2024 11:05:17 -0700 Subject: [PATCH] xds: add LRS named metrics support (#7027) --- .../balancer/clusterimpl/balancer_test.go | 19 ++++++++- xds/internal/balancer/clusterimpl/picker.go | 12 +----- .../xdsclient/transport/loadreport_test.go | 41 ++++++++++++++++++- 3 files changed, 58 insertions(+), 14 deletions(-) diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index d7221c32a81..e236f692c59 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -43,6 +43,8 @@ import ( "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/load" + + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" ) const ( @@ -51,6 +53,9 @@ const ( testClusterName = "test-cluster" testServiceName = "test-eds-service" + + testNamedMetricsKey1 = "test-named1" + testNamedMetricsKey2 = "test-named2" ) var ( @@ -68,6 +73,7 @@ var ( cmpopts.EquateEmpty(), cmpopts.IgnoreFields(load.Data{}, "ReportInterval"), } + toleranceCmpOpt = cmpopts.EquateApprox(0, 1e-5) ) type s struct { @@ -629,7 +635,10 @@ func (s) TestLoadReporting(t *testing.T) { if gotSCSt.SubConn != sc1 { return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) } - gotSCSt.Done(balancer.DoneInfo{}) + lr := &v3orcapb.OrcaLoadReport{ + NamedMetrics: map[string]float64{testNamedMetricsKey1: 3.14, testNamedMetricsKey2: 2.718}, + } + gotSCSt.Done(balancer.DoneInfo{ServerLoad: lr}) } for i := 0; i < errorCount; i++ { gotSCSt, err := p.Pick(balancer.PickInfo{}) @@ -671,7 +680,13 @@ func (s) TestLoadReporting(t *testing.T) { if reqStats.InProgress != 0 { t.Errorf("got inProgress %v, want %v", reqStats.InProgress, 0) } - + wantLoadStats := map[string]load.ServerLoadData{ + testNamedMetricsKey1: {Count: 5, Sum: 15.7}, // aggregation of 5 * 3.14 = 15.7 + testNamedMetricsKey2: {Count: 5, Sum: 13.59}, // aggregation of 5 * 2.718 = 13.59 + } + if diff := cmp.Diff(wantLoadStats, localityData.LoadStats, toleranceCmpOpt); diff != "" { + t.Errorf("localityData.LoadStats returned unexpected diff (-want +got):\n%s", diff) + } b.Close() if err := xdsC.WaitForCancelReportLoad(ctx); err != nil { t.Fatalf("unexpected error waiting form load report to be canceled: %v", err) diff --git a/xds/internal/balancer/clusterimpl/picker.go b/xds/internal/balancer/clusterimpl/picker.go index 3f354424f28..0788d22481d 100644 --- a/xds/internal/balancer/clusterimpl/picker.go +++ b/xds/internal/balancer/clusterimpl/picker.go @@ -68,11 +68,6 @@ func (d *dropper) drop() (ret bool) { return d.w.Next().(bool) } -const ( - serverLoadCPUName = "cpu_utilization" - serverLoadMemoryName = "mem_utilization" -) - // loadReporter wraps the methods from the loadStore that are used here. type loadReporter interface { CallStarted(locality string) @@ -163,12 +158,7 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { if !ok || load == nil { return } - d.loadStore.CallServerLoad(lIDStr, serverLoadCPUName, load.CpuUtilization) - d.loadStore.CallServerLoad(lIDStr, serverLoadMemoryName, load.MemUtilization) - for n, c := range load.RequestCost { - d.loadStore.CallServerLoad(lIDStr, n, c) - } - for n, c := range load.Utilization { + for n, c := range load.NamedMetrics { d.loadStore.CallServerLoad(lIDStr, n, c) } } diff --git a/xds/internal/xdsclient/transport/loadreport_test.go b/xds/internal/xdsclient/transport/loadreport_test.go index c3cdfede5cb..e56102ac0cc 100644 --- a/xds/internal/xdsclient/transport/loadreport_test.go +++ b/xds/internal/xdsclient/transport/loadreport_test.go @@ -24,6 +24,7 @@ import ( v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" "google.golang.org/grpc/internal/testutils/xds/fakeserver" "google.golang.org/grpc/xds/internal/testutils" @@ -35,6 +36,22 @@ import ( v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" ) +const ( + testLocality1 = `{"region":"test-region1"}` + testLocality2 = `{"region":"test-region2"}` + testKey1 = "test-key1" + testKey2 = "test-key2" +) + +var ( + toleranceCmpOpt = cmpopts.EquateApprox(0, 1e-5) + ignoreOrderCmpOpt = protocmp.FilterField(&v3endpointpb.ClusterStats{}, "upstream_locality_stats", + cmpopts.SortSlices(func(a, b protocmp.Message) bool { + return a.String() < b.String() + }), + ) +) + func (s) TestReportLoad(t *testing.T) { // Create a fake xDS management server listening on a local port. mgmtServer, cleanup := startFakeManagementServer(t) @@ -74,6 +91,13 @@ func (s) TestReportLoad(t *testing.T) { // Push some loads on the received store. store1.PerCluster("cluster1", "eds1").CallDropped("test") + store1.PerCluster("cluster1", "eds1").CallStarted(testLocality1) + store1.PerCluster("cluster1", "eds1").CallServerLoad(testLocality1, testKey1, 3.14) + store1.PerCluster("cluster1", "eds1").CallServerLoad(testLocality1, testKey1, 2.718) + store1.PerCluster("cluster1", "eds1").CallFinished(testLocality1, nil) + store1.PerCluster("cluster1", "eds1").CallStarted(testLocality2) + store1.PerCluster("cluster1", "eds1").CallServerLoad(testLocality2, testKey2, 1.618) + store1.PerCluster("cluster1", "eds1").CallFinished(testLocality2, nil) // Ensure the initial request is received. req, err := mgmtServer.LRSRequestChan.Receive(ctx) @@ -115,8 +139,23 @@ func (s) TestReportLoad(t *testing.T) { ClusterServiceName: "eds1", TotalDroppedRequests: 1, DroppedRequests: []*v3endpointpb.ClusterStats_DroppedRequests{{Category: "test", DroppedCount: 1}}, + UpstreamLocalityStats: []*v3endpointpb.UpstreamLocalityStats{ + { + Locality: &v3corepb.Locality{Region: "test-region1"}, + LoadMetricStats: []*v3endpointpb.EndpointLoadMetricStats{ + // TotalMetricValue is the aggregation of 3.14 + 2.718 = 5.858 + {MetricName: testKey1, NumRequestsFinishedWithMetric: 2, TotalMetricValue: 5.858}}, + TotalSuccessfulRequests: 1, + }, + { + Locality: &v3corepb.Locality{Region: "test-region2"}, + LoadMetricStats: []*v3endpointpb.EndpointLoadMetricStats{ + {MetricName: testKey2, NumRequestsFinishedWithMetric: 1, TotalMetricValue: 1.618}}, + TotalSuccessfulRequests: 1, + }, + }, } - if diff := cmp.Diff(wantLoad, gotLoad[0], protocmp.Transform()); diff != "" { + if diff := cmp.Diff(wantLoad, gotLoad[0], protocmp.Transform(), toleranceCmpOpt, ignoreOrderCmpOpt); diff != "" { t.Fatalf("Unexpected diff in LRS request (-got, +want):\n%s", diff) }