From 29caeb0802fe02659b50d2e7476ad3932c1dedac Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Thu, 11 May 2023 17:52:39 +0000 Subject: [PATCH 1/9] add eps in orca api --- .../io/grpc/services/CallMetricRecorder.java | 24 ++++++++-- .../services/InternalCallMetricRecorder.java | 5 +- .../java/io/grpc/services/MetricRecorder.java | 22 ++++++++- .../grpc/services/MetricRecorderHelper.java | 6 +-- .../java/io/grpc/services/MetricReport.java | 9 +++- .../grpc/services/CallMetricRecorderTest.java | 9 ++++ .../OrcaMetricReportingServerInterceptor.java | 5 ++ .../io/grpc/xds/orca/OrcaPerRequestUtil.java | 2 +- .../io/grpc/xds/orca/OrcaServiceImpl.java | 1 + .../WeightedRoundRobinLoadBalancerTest.java | 48 +++++++++---------- ...aMetricReportingServerInterceptorTest.java | 9 ++++ .../grpc/xds/orca/OrcaPerRequestUtilTest.java | 2 + .../io/grpc/xds/orca/OrcaServiceImplTest.java | 8 +++- 13 files changed, 112 insertions(+), 38 deletions(-) diff --git a/services/src/main/java/io/grpc/services/CallMetricRecorder.java b/services/src/main/java/io/grpc/services/CallMetricRecorder.java index 8570a989f26..de8aa48d300 100644 --- a/services/src/main/java/io/grpc/services/CallMetricRecorder.java +++ b/services/src/main/java/io/grpc/services/CallMetricRecorder.java @@ -44,6 +44,7 @@ public final class CallMetricRecorder { private double cpuUtilizationMetric = 0; private double memoryUtilizationMetric = 0; private double qps = 0; + private double eps = 0; private volatile boolean disabled; /** @@ -160,8 +161,8 @@ public CallMetricRecorder recordMemoryUtilizationMetric(double value) { } /** - * Records a call metric measurement for qps in the range [0, inf). Values outside the valid range - * are ignored. If RPC has already finished, this method is no-op. + * Records a call metric measurement for queries per second (qps) in the range [0, inf). Values + * outside the valid range are ignored. If RPC has already finished, this method is no-op. * *

A latter record will overwrite its former name-sakes. * @@ -169,13 +170,28 @@ public CallMetricRecorder recordMemoryUtilizationMetric(double value) { * @since 1.54.0 */ public CallMetricRecorder recordQpsMetric(double value) { - if (disabled || !MetricRecorderHelper.isQpsValid(value)) { + if (disabled || !MetricRecorderHelper.isRateValid(value)) { return this; } qps = value; return this; } + /** + * Records a call metric measurement for errors per second (eps) in the range [0, inf). Values + * outside the valid range are ignored. If RPC has already finished, this method is no-op. + * + *

A latter record will overwrite its former name-sakes. + * + * @return this recorder object + */ + public CallMetricRecorder recordEpsMetric(double value) { + if (disabled || !MetricRecorderHelper.isRateValid(value)) { + return this; + } + eps = value; + return this; + } /** * Returns all request cost metric values. No more metric values will be recorded after this @@ -205,7 +221,7 @@ MetricReport finalizeAndDump2() { if (savedUtilizationMetrics == null) { savedUtilizationMetrics = Collections.emptyMap(); } - return new MetricReport(cpuUtilizationMetric, memoryUtilizationMetric, qps, + return new MetricReport(cpuUtilizationMetric, memoryUtilizationMetric, qps, eps, Collections.unmodifiableMap(savedRequestCostMetrics), Collections.unmodifiableMap(savedUtilizationMetrics) ); diff --git a/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java b/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java index 6cee9048c4c..5865357aabf 100644 --- a/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java +++ b/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java @@ -46,8 +46,9 @@ public static MetricReport finalizeAndDump2(CallMetricRecorder recorder) { } public static MetricReport createMetricReport(double cpuUtilization, double memoryUtilization, - double qps, Map requestCostMetrics, Map utilizationMetrics) { - return new MetricReport(cpuUtilization, memoryUtilization, qps, requestCostMetrics, + double qps, double eps, Map requestCostMetrics, + Map utilizationMetrics) { + return new MetricReport(cpuUtilization, memoryUtilization, qps, eps, requestCostMetrics, utilizationMetrics); } } diff --git a/services/src/main/java/io/grpc/services/MetricRecorder.java b/services/src/main/java/io/grpc/services/MetricRecorder.java index 3027d495460..248cbb13e56 100644 --- a/services/src/main/java/io/grpc/services/MetricRecorder.java +++ b/services/src/main/java/io/grpc/services/MetricRecorder.java @@ -31,6 +31,7 @@ public final class MetricRecorder { private volatile double cpuUtilization; private volatile double memoryUtilization; private volatile double qps; + private volatile double eps; public static MetricRecorder newInstance() { return new MetricRecorder(); @@ -103,7 +104,7 @@ public void clearMemoryUtilizationMetric() { * Update the QPS metrics data in the range [0, inf). Values outside the valid range are ignored. */ public void setQpsMetric(double value) { - if (!MetricRecorderHelper.isQpsValid(value)) { + if (!MetricRecorderHelper.isRateValid(value)) { return; } qps = value; @@ -116,8 +117,25 @@ public void clearQpsMetric() { qps = 0; } + /** + * Update the EPS metrics data in the range [0, inf). Values outside the valid range are ignored. + */ + public void setEpsMetric(double value) { + if (!MetricRecorderHelper.isRateValid(value)) { + return; + } + this.eps = value; + } + + /** + * Clear the EPS metrics data. + */ + public void clearEpsMetric() { + eps = 0; + } + MetricReport getMetricReport() { - return new MetricReport(cpuUtilization, memoryUtilization, qps, + return new MetricReport(cpuUtilization, memoryUtilization, qps, eps, Collections.emptyMap(), Collections.unmodifiableMap(metricsData)); } } diff --git a/services/src/main/java/io/grpc/services/MetricRecorderHelper.java b/services/src/main/java/io/grpc/services/MetricRecorderHelper.java index 94a811f4f10..66439ac6044 100644 --- a/services/src/main/java/io/grpc/services/MetricRecorderHelper.java +++ b/services/src/main/java/io/grpc/services/MetricRecorderHelper.java @@ -39,10 +39,10 @@ static boolean isCpuUtilizationValid(double utilization) { } /** - * Return true if the qps value is in the range [0, inf) and false otherwise. + * Return true if a rate value (such as qps or eps) is in the range [0, inf) and false otherwise. */ - static boolean isQpsValid(double qps) { - return qps >= 0.0; + static boolean isRateValid(double rate) { + return rate >= 0.0; } // Prevent instantiation. diff --git a/services/src/main/java/io/grpc/services/MetricReport.java b/services/src/main/java/io/grpc/services/MetricReport.java index 73aba7a2af9..0ab8a386c10 100644 --- a/services/src/main/java/io/grpc/services/MetricReport.java +++ b/services/src/main/java/io/grpc/services/MetricReport.java @@ -31,15 +31,17 @@ public final class MetricReport { private double cpuUtilization; private double memoryUtilization; private double qps; + private double eps; private Map requestCostMetrics; private Map utilizationMetrics; - MetricReport(double cpuUtilization, double memoryUtilization, double qps, + MetricReport(double cpuUtilization, double memoryUtilization, double qps, double eps, Map requestCostMetrics, Map utilizationMetrics) { this.cpuUtilization = cpuUtilization; this.memoryUtilization = memoryUtilization; this.qps = qps; + this.eps = eps; this.requestCostMetrics = checkNotNull(requestCostMetrics, "requestCostMetrics"); this.utilizationMetrics = checkNotNull(utilizationMetrics, "utilizationMetrics"); } @@ -64,6 +66,10 @@ public double getQps() { return qps; } + public double getEps() { + return eps; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -72,6 +78,7 @@ public String toString() { .add("requestCost", requestCostMetrics) .add("utilization", utilizationMetrics) .add("qps", qps) + .add("eps", eps) .toString(); } } diff --git a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java index 03f29a05ef5..6893af7cd9b 100644 --- a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java +++ b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java @@ -47,6 +47,7 @@ public void dumpDumpsAllSavedMetricValues() { recorder.recordCpuUtilizationMetric(0.1928); recorder.recordMemoryUtilizationMetric(0.474); recorder.recordQpsMetric(2522.54); + recorder.recordEpsMetric(1.618); MetricReport dump = recorder.finalizeAndDump2(); Truth.assertThat(dump.getUtilizationMetrics()) @@ -56,12 +57,15 @@ public void dumpDumpsAllSavedMetricValues() { Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0.1928); Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0.474); Truth.assertThat(dump.getQps()).isEqualTo(2522.54); + Truth.assertThat(dump.getEps()).isEqualTo(1.618); } @Test public void noMetricsRecordedAfterSnapshot() { Map initDump = recorder.finalizeAndDump(); recorder.recordUtilizationMetric("cost", 0.154353423); + recorder.recordQpsMetric(3.14159); + recorder.recordEpsMetric(1.618); assertThat(recorder.finalizeAndDump()).isEqualTo(initDump); } @@ -84,12 +88,14 @@ public void noMetricsRecordedIfUtilizationAndQpsAreLessThanLowerBound() { recorder.recordCpuUtilizationMetric(-0.001); recorder.recordMemoryUtilizationMetric(-0.001); recorder.recordQpsMetric(-0.001); + recorder.recordEpsMetric(-0.001); recorder.recordUtilizationMetric("util1", -0.001); MetricReport dump = recorder.finalizeAndDump2(); Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0); Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0); Truth.assertThat(dump.getQps()).isEqualTo(0); + Truth.assertThat(dump.getEps()).isEqualTo(0); Truth.assertThat(dump.getUtilizationMetrics()).isEmpty(); Truth.assertThat(dump.getRequestCostMetrics()).isEmpty(); } @@ -108,6 +114,8 @@ public void lastValueWinForMetricsWithSameName() { recorder.recordUtilizationMetric("util1", 0.843233); recorder.recordQpsMetric(1928.3); recorder.recordQpsMetric(100.8); + recorder.recordEpsMetric(3.14159); + recorder.recordEpsMetric(1.618); MetricReport dump = recorder.finalizeAndDump2(); Truth.assertThat(dump.getRequestCostMetrics()) @@ -117,6 +125,7 @@ public void lastValueWinForMetricsWithSameName() { .containsExactly("util1", 0.843233); Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0); Truth.assertThat(dump.getQps()).isEqualTo(100.8); + Truth.assertThat(dump.getEps()).isEqualTo(1.618); } @Test diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java b/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java index f6d8dcbfb7e..2470202b2af 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java @@ -117,6 +117,7 @@ private static OrcaLoadReport.Builder fromInternalReport(MetricReport internalRe .setCpuUtilization(internalReport.getCpuUtilization()) .setMemUtilization(internalReport.getMemoryUtilization()) .setRpsFractional(internalReport.getQps()) + .setEps(internalReport.getEps()) .putAllUtilization(internalReport.getUtilizationMetrics()) .putAllRequestCost(internalReport.getRequestCostMetrics()); } @@ -145,6 +146,10 @@ private static void mergeMetrics( if (isReportValueSet(rps)) { metricRecorderReportBuilder.setRpsFractional(rps); } + double eps = callMetricRecorderReport.getEps(); + if (isReportValueSet(eps)) { + metricRecorderReportBuilder.setEps(eps); + } } private static boolean isReportValueSet(double value) { diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java b/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java index 97414529678..2778cfdf9ea 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java @@ -254,7 +254,7 @@ public void inboundTrailers(Metadata trailers) { static MetricReport fromOrcaLoadReport(OrcaLoadReport loadReport) { return InternalCallMetricRecorder.createMetricReport(loadReport.getCpuUtilization(), - loadReport.getMemUtilization(), loadReport.getRpsFractional(), + loadReport.getMemUtilization(), loadReport.getRpsFractional(), loadReport.getEps(), loadReport.getRequestCostMap(), loadReport.getUtilizationMap()); } diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java b/xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java index 30522a5e0f6..ce92225cb1e 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java @@ -151,6 +151,7 @@ private OrcaLoadReport generateMetricsReport() { return OrcaLoadReport.newBuilder().setCpuUtilization(internalReport.getCpuUtilization()) .setMemUtilization(internalReport.getMemoryUtilization()) .setRpsFractional(internalReport.getQps()) + .setEps(internalReport.getEps()) .putAllUtilization(internalReport.getUtilizationMetrics()) .build(); } diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java index da298d3abbb..66d3211fe7c 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java @@ -208,9 +208,9 @@ public void wrrLifeCycle() { WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) .getSubchannel()).isEqualTo(weightedSubchannel1); @@ -252,9 +252,9 @@ public void enableOobLoadReportConfig() { WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.9, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.9, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); PickResult pickResult = weightedPicker.pickSubchannel(mockArgs); assertThat(pickResult.getSubchannel()).isEqualTo(weightedSubchannel1); @@ -328,11 +328,11 @@ private void pickByWeight(MetricReport r1, MetricReport r2, MetricReport r3, @Test public void pickByWeight_LargeWeight() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 999, new HashMap<>(), new HashMap<>()); + 0.1, 0.1, 999, 0, new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.9, 0.1, 2, new HashMap<>(), new HashMap<>()); + 0.9, 0.1, 2, 0, new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0.1, 100, new HashMap<>(), new HashMap<>()); + 0.86, 0.1, 100, 0, new HashMap<>(), new HashMap<>()); double totalWeight = 999 / 0.1 + 2 / 0.9 + 100 / 0.86; pickByWeight(report1, report2, report3, 999 / 0.1 / totalWeight, 2 / 0.9 / totalWeight, @@ -342,11 +342,11 @@ public void pickByWeight_LargeWeight() { @Test public void pickByWeight_normalWeight() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.12, 0.1, 22, new HashMap<>(), new HashMap<>()); + 0.12, 0.1, 22, 0, new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.28, 0.1, 40, new HashMap<>(), new HashMap<>()); + 0.28, 0.1, 40, 0, new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0.1, 100, new HashMap<>(), new HashMap<>()); + 0.86, 0.1, 100, 0, new HashMap<>(), new HashMap<>()); double totalWeight = 22 / 0.12 + 40 / 0.28 + 100 / 0.86; pickByWeight(report1, report2, report3, 22 / 0.12 / totalWeight, 40 / 0.28 / totalWeight, 100 / 0.86 / totalWeight @@ -396,9 +396,9 @@ public void blackoutPeriod() { WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(5, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -454,9 +454,9 @@ public void updateWeightTimer() { WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) .getSubchannel()).isEqualTo(weightedSubchannel1); @@ -469,9 +469,9 @@ public void updateWeightTimer() { .setAttributes(affinity).build())); assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); //timer fires, new weight updated assertThat(fakeClock.forwardTime(500, TimeUnit.MILLISECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) @@ -501,9 +501,9 @@ public void weightExpired() { WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -563,7 +563,7 @@ public void rrFallback() { assertThat(pickResult.getStreamTracerFactory()).isNotNull(); WrrSubchannel subchannel = (WrrSubchannel)pickResult.getSubchannel(); subchannel.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, qpsByChannel.get(subchannel), new HashMap<>(), new HashMap<>())); + 0.1, 0.1, qpsByChannel.get(subchannel), 0, new HashMap<>(), new HashMap<>())); } assertThat(Math.abs(pickCount.get(weightedSubchannel1) / 1000.0 - 1.0 / 2)) .isAtMost(0.1); @@ -577,7 +577,7 @@ public void rrFallback() { assertThat(pickResult.getStreamTracerFactory()).isNotNull(); WrrSubchannel subchannel = (WrrSubchannel)pickResult.getSubchannel(); subchannel.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, qpsByChannel.get(subchannel), new HashMap<>(), new HashMap<>())); + 0.1, 0.1, qpsByChannel.get(subchannel), 0, new HashMap<>(), new HashMap<>())); fakeClock.forwardTime(50, TimeUnit.MILLISECONDS); } assertThat(pickCount.size()).isEqualTo(2); @@ -614,9 +614,9 @@ public void unknownWeightIsAvgWeight() { WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); WrrSubchannel weightedSubchannel3 = (WrrSubchannel) weightedPicker.getList().get(2); weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -656,9 +656,9 @@ public void pickFromOtherThread() throws Exception { WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); CyclicBarrier barrier = new CyclicBarrier(2); Map pickCount = new ConcurrentHashMap<>(); pickCount.put(weightedSubchannel1, new AtomicInteger(0)); diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java index 7681f0b42f4..d8a1492f7cf 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java @@ -75,6 +75,7 @@ public class OrcaMetricReportingServerInterceptorTest { private double cpuUtilizationMetrics = 0; private double memoryUtilizationMetrics = 0; private double qpsMetrics = 0; + private double epsMetrics = 0; private MetricRecorder metricRecorder; private final AtomicReference trailersCapture = new AtomicReference<>(); @@ -99,6 +100,7 @@ public void unaryRpc( CallMetricRecorder.getCurrent().recordCpuUtilizationMetric(cpuUtilizationMetrics); CallMetricRecorder.getCurrent().recordMemoryUtilizationMetric(memoryUtilizationMetrics); CallMetricRecorder.getCurrent().recordQpsMetric(qpsMetrics); + CallMetricRecorder.getCurrent().recordEpsMetric(epsMetrics); SimpleResponse response = SimpleResponse.newBuilder().setResponseMessage("Simple response").build(); responseObserver.onNext(response); @@ -194,6 +196,7 @@ public void responseTrailersContainAllReportedMetricsFromCallMetricRecorder() { cpuUtilizationMetrics = 0.3465; memoryUtilizationMetrics = 0.764; qpsMetrics = 3.1415926535; + epsMetrics = 1.618; ClientCalls.blockingUnaryCall(channelToUse, SIMPLE_METHOD, CallOptions.DEFAULT, REQUEST); Metadata receivedTrailers = trailersCapture.get(); OrcaLoadReport report = @@ -205,6 +208,7 @@ public void responseTrailersContainAllReportedMetricsFromCallMetricRecorder() { assertThat(report.getCpuUtilization()).isEqualTo(0.3465); assertThat(report.getMemUtilization()).isEqualTo(0.764); assertThat(report.getRpsFractional()).isEqualTo(3.1415926535); + assertThat(report.getEps()).isEqualTo(1.618); } @Test @@ -216,6 +220,7 @@ public void responseTrailersContainMergedMetricsFromCallMetricRecorderAndMetricR memoryUtilizationMetrics = 0.967; metricRecorder.setMemoryUtilizationMetric(0.764); metricRecorder.setQpsMetric(1.618); + metricRecorder.setEpsMetric(3.14159); metricRecorder.putUtilizationMetric("serverUtil1", 0.7467); metricRecorder.putUtilizationMetric("serverUtil2", 0.2233); metricRecorder.putUtilizationMetric("util1", 0.01); @@ -233,14 +238,17 @@ public void responseTrailersContainMergedMetricsFromCallMetricRecorderAndMetricR assertThat(report.getCpuUtilization()).isEqualTo(0.3465); assertThat(report.getMemUtilization()).isEqualTo(0.967); assertThat(report.getRpsFractional()).isEqualTo(1.618); + assertThat(report.getEps()).isEqualTo(3.14159); } @Test public void responseTrailersContainMergedMetricsFromCallMetricRecorderAndMetricRecorderNoMap() { qpsMetrics = 5142.77; + epsMetrics = 2233.88; metricRecorder.setCpuUtilizationMetric(0.314159); metricRecorder.setMemoryUtilizationMetric(0.764); metricRecorder.setQpsMetric(1.618); + metricRecorder.setEpsMetric(3.14159); ClientCalls.blockingUnaryCall(channelToUse, SIMPLE_METHOD, CallOptions.DEFAULT, REQUEST); Metadata receivedTrailers = trailersCapture.get(); @@ -252,6 +260,7 @@ public void responseTrailersContainMergedMetricsFromCallMetricRecorderAndMetricR assertThat(report.getCpuUtilization()).isEqualTo(0.314159); assertThat(report.getMemUtilization()).isEqualTo(0.764); assertThat(report.getRpsFractional()).isEqualTo(5142.77); + assertThat(report.getEps()).isEqualTo(2233.88); } private static final class TrailersCapturingClientInterceptor implements ClientInterceptor { diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java index 4d0e2070b3b..91f0e1f493a 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java @@ -120,6 +120,8 @@ static boolean reportEqual(MetricReport a, MetricReport b) { return a.getCpuUtilization() == b.getCpuUtilization() && a.getMemoryUtilization() == b.getMemoryUtilization() + && a.getQps() == b.getQps() + && a.getEps() == b.getEps() && Objects.equal(a.getRequestCostMetrics(), b.getRequestCostMetrics()) && Objects.equal(a.getUtilizationMetrics(), b.getUtilizationMetrics()); } diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaServiceImplTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaServiceImplTest.java index 124a21ddb76..72c518eb5b2 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaServiceImplTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaServiceImplTest.java @@ -144,17 +144,19 @@ public void testRequestIntervalLess() { OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT); defaultTestService.putUtilizationMetric("buffer", 0.2); defaultTestService.setQpsMetric(1.9); + defaultTestService.setEpsMetric(0.2233); call.start(listener, new Metadata()); call.sendMessage(OrcaLoadReportRequest.newBuilder() .setReportInterval(Duration.newBuilder().setSeconds(0).setNanos(500).build()).build()); call.halfClose(); call.request(1); OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2) - .setRpsFractional(1.9).build(); + .setRpsFractional(1.9).setEps(0.2233).build(); verify(listener).onMessage(eq(expect)); reset(listener); defaultTestService.removeUtilizationMetric("buffer0"); defaultTestService.clearQpsMetric(); + defaultTestService.clearEpsMetric(); assertThat(fakeClock.forwardTime(500, TimeUnit.NANOSECONDS)).isEqualTo(0); verifyNoInteractions(listener); assertThat(fakeClock.forwardTime(1, TimeUnit.SECONDS)).isEqualTo(1); @@ -250,12 +252,14 @@ public void testApis() throws Exception { .putAllUtilization(firstUtilization) .putUtilization("queue", 1.0) .setRpsFractional(1239.01) + .setEps(1.618) .build(); defaultTestService.setCpuUtilizationMetric(goldenReport.getCpuUtilization()); defaultTestService.setMemoryUtilizationMetric(goldenReport.getMemUtilization()); defaultTestService.setAllUtilizationMetrics(firstUtilization); defaultTestService.putUtilizationMetric("queue", 1.0); defaultTestService.setQpsMetric(1239.01); + defaultTestService.setEpsMetric(1.618); Iterator reports = OpenRcaServiceGrpc.newBlockingStub(channel) .streamCoreMetrics(OrcaLoadReportRequest.newBuilder().build()); assertThat(reports.next()).isEqualTo(goldenReport); @@ -263,6 +267,7 @@ public void testApis() throws Exception { defaultTestService.clearCpuUtilizationMetric(); defaultTestService.clearMemoryUtilizationMetric(); defaultTestService.clearQpsMetric(); + defaultTestService.clearEpsMetric(); fakeClock.forwardTime(1, TimeUnit.SECONDS); goldenReport = OrcaLoadReport.newBuilder() .putAllUtilization(firstUtilization) @@ -279,6 +284,7 @@ public void testApis() throws Exception { defaultTestService.setMemoryUtilizationMetric(-0.001); defaultTestService.setMemoryUtilizationMetric(1.001); defaultTestService.setQpsMetric(-0.001); + defaultTestService.setEpsMetric(-0.001); defaultTestService.putUtilizationMetric("util-out-of-range", -0.001); defaultTestService.putUtilizationMetric("util-out-of-range", 1.001); fakeClock.forwardTime(1, TimeUnit.SECONDS); From 1ec312cf566c520a7421eff0640eb45d8fd6f7c3 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Thu, 11 May 2023 20:03:58 +0000 Subject: [PATCH 2/9] update cncf/xds version in repositories.bzl --- repositories.bzl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/repositories.bzl b/repositories.bzl index b11bb73c5c5..5621323168b 100644 --- a/repositories.bzl +++ b/repositories.bzl @@ -94,10 +94,10 @@ def grpc_java_repositories(): if not native.existing_rule("com_github_cncf_xds"): http_archive( name = "com_github_cncf_xds", - strip_prefix = "xds-06c439db220b89134a8a49bad41994560d6537c6", - sha256 = "41ea212940ab44bf7f8a8b4169cfbc612ed2166dafabc0a56a8820ef665fc6a4", + strip_prefix = "xds-32f1caf87195bf3390061c29f18987e51ca56a88", + sha256 = "fcd0b50c013452fda9c5e28c131c287b655ebb361271a76ad3bffc08b3ecd82e", urls = [ - "https://github.com/cncf/xds/archive/06c439db220b89134a8a49bad41994560d6537c6.tar.gz", + "https://github.com/cncf/xds/archive/32f1caf87195bf3390061c29f18987e51ca56a88.tar.gz", ], ) if not native.existing_rule("com_github_grpc_grpc"): From fd1c9a75b93863a1277fda6e888c44ec1b27bbce Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 15 May 2023 18:39:22 +0000 Subject: [PATCH 3/9] xds: update weight formula in WRR --- .../main/java/io/grpc/internal/JsonUtil.java | 26 +++++++++ .../java/io/grpc/internal/JsonUtilTest.java | 19 +++++++ .../grpc/xds/LoadBalancerConfigFactory.java | 11 +++- .../xds/WeightedRoundRobinLoadBalancer.java | 23 ++++++-- ...eightedRoundRobinLoadBalancerProvider.java | 4 ++ .../xds/LoadBalancerConfigFactoryTest.java | 6 ++- ...tedRoundRobinLoadBalancerProviderTest.java | 5 +- .../WeightedRoundRobinLoadBalancerTest.java | 54 +++++++++++++++++++ .../io/grpc/xds/XdsClientImplDataTest.java | 4 ++ 9 files changed, 144 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/JsonUtil.java b/core/src/main/java/io/grpc/internal/JsonUtil.java index 65f7cf5649e..bc16cbaed41 100644 --- a/core/src/main/java/io/grpc/internal/JsonUtil.java +++ b/core/src/main/java/io/grpc/internal/JsonUtil.java @@ -121,6 +121,32 @@ public static Double getNumberAsDouble(Map obj, String key) { String.format("value '%s' for key '%s' in '%s' is not a number", value, key, obj)); } + /** + * Gets a number from an object for the given key. If the key is not present, this returns null. + * If the value does not represent a float, throws an exception. + */ + @Nullable + public static Float getNumberAsFloat(Map obj, String key) { + assert key != null; + if (!obj.containsKey(key)) { + return null; + } + Object value = obj.get(key); + if (value instanceof Float) { + return (Float) value; + } + if (value instanceof String) { + try { + return Float.parseFloat((String) value); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + String.format("value '%s' for key '%s' is not a float", value, key)); + } + } + throw new IllegalArgumentException( + String.format("value '%s' for key '%s' in '%s' is not a number", value, key, obj)); + } + /** * Gets a number from an object for the given key, casted to an integer. If the key is not * present, this returns null. If the value does not represent an integer, throws an exception. diff --git a/core/src/test/java/io/grpc/internal/JsonUtilTest.java b/core/src/test/java/io/grpc/internal/JsonUtilTest.java index a01f8868220..6b7647a53db 100644 --- a/core/src/test/java/io/grpc/internal/JsonUtilTest.java +++ b/core/src/test/java/io/grpc/internal/JsonUtilTest.java @@ -38,6 +38,7 @@ public void getNumber() { map.put("key_string_nan", "NaN"); map.put("key_number_5.5", 5.5D); map.put("key_string_six", "six"); + map.put("key_number_7", 7F); map.put("key_string_infinity", "Infinity"); map.put("key_string_minus_infinity", "-Infinity"); map.put("key_string_exponent", "2.998e8"); @@ -48,6 +49,7 @@ public void getNumber() { assertThat(JsonUtil.getNumberAsLong(map, "key_number_1")).isEqualTo(1L); assertThat(JsonUtil.getNumberAsDouble(map, "key_string_2.0")).isEqualTo(2D); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_2.0")).isEqualTo(2F); try { JsonUtil.getNumberAsInteger(map, "key_string_2.0"); fail("expecting to throw but did not"); @@ -66,8 +68,10 @@ public void getNumber() { assertThat(JsonUtil.getNumberAsDouble(map, "key_string_3")).isEqualTo(3D); assertThat(JsonUtil.getNumberAsInteger(map, "key_string_3")).isEqualTo(3); assertThat(JsonUtil.getNumberAsLong(map, "key_string_3")).isEqualTo(3L); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_3")).isEqualTo(3F); assertThat(JsonUtil.getNumberAsDouble(map, "key_string_nan")).isNaN(); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_nan")).isNaN(); try { JsonUtil.getNumberAsInteger(map, "key_string_nan"); fail("expecting to throw but did not"); @@ -118,18 +122,33 @@ public void getNumber() { assertThat(e).hasMessageThat().isEqualTo( "value 'six' for key 'key_string_six' is not a long integer"); } + try { + JsonUtil.getNumberAsFloat(map, "key_string_six"); + fail("expecting to throw but did not"); + } catch (RuntimeException e) { + assertThat(e).hasMessageThat().isEqualTo( + "value 'six' for key 'key_string_six' is not a float"); + } + + assertThat(JsonUtil.getNumberAsFloat(map, "key_number_7")).isEqualTo(7F); assertThat(JsonUtil.getNumberAsDouble(map, "key_string_infinity")).isPositiveInfinity(); assertThat(JsonUtil.getNumberAsDouble(map, "key_string_minus_infinity")).isNegativeInfinity(); assertThat(JsonUtil.getNumberAsDouble(map, "key_string_exponent")).isEqualTo(2.998e8D); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_infinity")).isPositiveInfinity(); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_minus_infinity")).isNegativeInfinity(); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_exponent")).isEqualTo(2.998e8F); + assertThat(JsonUtil.getNumberAsDouble(map, "key_string_minus_zero")).isZero(); assertThat(JsonUtil.getNumberAsInteger(map, "key_string_minus_zero")).isEqualTo(0); assertThat(JsonUtil.getNumberAsLong(map, "key_string_minus_zero")).isEqualTo(0L); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_minus_zero")).isZero(); assertThat(JsonUtil.getNumberAsDouble(map, "key_nonexistent")).isNull(); assertThat(JsonUtil.getNumberAsInteger(map, "key_nonexistent")).isNull(); assertThat(JsonUtil.getNumberAsLong(map, "key_nonexistent")).isNull(); + assertThat(JsonUtil.getNumberAsFloat(map, "key_nonexistent")).isNull(); } @Test diff --git a/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java index 228b2442eb8..9b1dc722400 100644 --- a/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java +++ b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java @@ -89,6 +89,8 @@ class LoadBalancerConfigFactory { static final String PICK_FIRST_FIELD_NAME = "pick_first"; static final String SHUFFLE_ADDRESS_LIST_FIELD_NAME = "shuffleAddressList"; + static final String ERROR_UTILIZATION_PENALTY = "errorUtilizationPenalty"; + /** * Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link * Cluster}. @@ -135,7 +137,8 @@ class LoadBalancerConfigFactory { String weightExpirationPeriod, String oobReportingPeriod, Boolean enableOobLoadReport, - String weightUpdatePeriod) { + String weightUpdatePeriod, + Float errorUtilizationPenalty) { ImmutableMap.Builder configBuilder = ImmutableMap.builder(); if (blackoutPeriod != null) { configBuilder.put(BLACK_OUT_PERIOD, blackoutPeriod); @@ -152,6 +155,9 @@ class LoadBalancerConfigFactory { if (weightUpdatePeriod != null) { configBuilder.put(WEIGHT_UPDATE_PERIOD, weightUpdatePeriod); } + if (errorUtilizationPenalty != null) { + configBuilder.put(ERROR_UTILIZATION_PENALTY, errorUtilizationPenalty); + } return ImmutableMap.of(WeightedRoundRobinLoadBalancerProvider.SCHEME, configBuilder.buildOrThrow()); } @@ -291,7 +297,8 @@ static class LoadBalancingPolicyConverter { ? Durations.toString(wrr.getWeightExpirationPeriod()) : null, wrr.hasOobReportingPeriod() ? Durations.toString(wrr.getOobReportingPeriod()) : null, wrr.hasEnableOobLoadReport() ? wrr.getEnableOobLoadReport().getValue() : null, - wrr.hasWeightUpdatePeriod() ? Durations.toString(wrr.getWeightUpdatePeriod()) : null); + wrr.hasWeightUpdatePeriod() ? Durations.toString(wrr.getWeightUpdatePeriod()) : null, + wrr.hasErrorUtilizationPenalty() ? wrr.getErrorUtilizationPenalty().getValue() : null); } catch (IllegalArgumentException ex) { throw new ResourceInvalidException("Invalid duration in weighted round robin config: " + ex.getMessage()); diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java index 87593d53241..984db2a0771 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java @@ -184,8 +184,14 @@ final class WrrSubchannel extends ForwardingSubchannel { @VisibleForTesting void onLoadReport(MetricReport report) { - double newWeight = report.getCpuUtilization() == 0 ? 0 : - report.getQps() / report.getCpuUtilization(); + double newWeight = 0; + if (report.getCpuUtilization() > 0 && report.getQps() > 0) { + double penalty = 0; + if (report.getEps() > 0 && config.errorUtilizationPenalty > 0) { + penalty = report.getEps() / report.getQps() * config.errorUtilizationPenalty; + } + newWeight = report.getQps() / (report.getCpuUtilization() + penalty); + } if (newWeight == 0) { return; } @@ -419,6 +425,7 @@ static final class WeightedRoundRobinLoadBalancerConfig { final boolean enableOobLoadReport; final long oobReportingPeriodNanos; final long weightUpdatePeriodNanos; + final float errorUtilizationPenalty; public static Builder newBuilder() { return new Builder(); @@ -428,12 +435,14 @@ private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos, long weightExpirationPeriodNanos, boolean enableOobLoadReport, long oobReportingPeriodNanos, - long weightUpdatePeriodNanos) { + long weightUpdatePeriodNanos, + float errorUtilizationPenalty) { this.blackoutPeriodNanos = blackoutPeriodNanos; this.weightExpirationPeriodNanos = weightExpirationPeriodNanos; this.enableOobLoadReport = enableOobLoadReport; this.oobReportingPeriodNanos = oobReportingPeriodNanos; this.weightUpdatePeriodNanos = weightUpdatePeriodNanos; + this.errorUtilizationPenalty = errorUtilizationPenalty; } static final class Builder { @@ -442,6 +451,7 @@ static final class Builder { boolean enableOobLoadReport = false; long oobReportingPeriodNanos = 10_000_000_000L; // 10s long weightUpdatePeriodNanos = 1_000_000_000L; // 1s + float errorUtilizationPenalty = 1.0F; private Builder() { @@ -472,10 +482,15 @@ Builder setWeightUpdatePeriodNanos(long weightUpdatePeriodNanos) { return this; } + Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) { + this.errorUtilizationPenalty = errorUtilizationPenalty; + return this; + } + WeightedRoundRobinLoadBalancerConfig build() { return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos, weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos, - weightUpdatePeriodNanos); + weightUpdatePeriodNanos, errorUtilizationPenalty); } } } diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProvider.java index ceaa4d7e97d..161e7c4ed0c 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProvider.java @@ -79,6 +79,7 @@ private ConfigOrError parseLoadBalancingPolicyConfigInternal(Map rawC Long oobReportingPeriodNanos = JsonUtil.getStringAsDuration(rawConfig, "oobReportingPeriod"); Boolean enableOobLoadReport = JsonUtil.getBoolean(rawConfig, "enableOobLoadReport"); Long weightUpdatePeriodNanos = JsonUtil.getStringAsDuration(rawConfig, "weightUpdatePeriod"); + Float errorUtilizationPenalty = JsonUtil.getNumberAsFloat(rawConfig, "errorUtilizationPenalty"); WeightedRoundRobinLoadBalancerConfig.Builder configBuilder = WeightedRoundRobinLoadBalancerConfig.newBuilder(); @@ -100,6 +101,9 @@ private ConfigOrError parseLoadBalancingPolicyConfigInternal(Map rawC configBuilder.setWeightUpdatePeriodNanos(MIN_WEIGHT_UPDATE_PERIOD_NANOS); } } + if (errorUtilizationPenalty != null) { + configBuilder.setErrorUtilizationPenalty(errorUtilizationPenalty); + } return ConfigOrError.fromConfig(configBuilder.build()); } } diff --git a/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java b/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java index db69584901d..fe500105bc6 100644 --- a/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java @@ -26,6 +26,7 @@ import com.google.protobuf.Any; import com.google.protobuf.BoolValue; import com.google.protobuf.Duration; +import com.google.protobuf.FloatValue; import com.google.protobuf.Struct; import com.google.protobuf.UInt32Value; import com.google.protobuf.UInt64Value; @@ -95,6 +96,8 @@ public class LoadBalancerConfigFactoryTest { .setBlackoutPeriod(Duration.newBuilder().setSeconds(287).build()) .setEnableOobLoadReport( BoolValue.newBuilder().setValue(true).build()) + .setErrorUtilizationPenalty( + FloatValue.newBuilder().setValue(1.75F).build()) .build())) .build()) .build(); @@ -125,7 +128,8 @@ public class LoadBalancerConfigFactoryTest { private static final LbConfig VALID_WRR_CONFIG = new LbConfig("wrr_locality_experimental", ImmutableMap.of("childPolicy", ImmutableList.of( ImmutableMap.of("weighted_round_robin", - ImmutableMap.of("blackoutPeriod","287s", "enableOobLoadReport", true ))))); + ImmutableMap.of("blackoutPeriod","287s", "enableOobLoadReport", true, + "errorUtilizationPenalty", 1.75F ))))); private static final LbConfig VALID_RING_HASH_CONFIG = new LbConfig("ring_hash_experimental", ImmutableMap.of("minRingSize", (double) RING_HASH_MIN_RING_SIZE, "maxRingSize", (double) RING_HASH_MAX_RING_SIZE)); diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProviderTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProviderTest.java index db72d855258..ddde84ca842 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProviderTest.java @@ -78,7 +78,8 @@ public void parseLoadBalancingConfig() throws IOException { + " \"weightExpirationPeriod\" : \"300s\"," + " \"oobReportingPeriod\" : \"100s\"," + " \"enableOobLoadReport\" : true," - + " \"weightUpdatePeriod\" : \"2s\"" + + " \"weightUpdatePeriod\" : \"2s\"," + + " \"errorUtilizationPenalty\" : \"1.75\"" + " }"; ConfigOrError configOrError = provider.parseLoadBalancingPolicyConfig( @@ -91,6 +92,7 @@ public void parseLoadBalancingConfig() throws IOException { assertThat(config.oobReportingPeriodNanos).isEqualTo(100_000_000_000L); assertThat(config.enableOobLoadReport).isEqualTo(true); assertThat(config.weightUpdatePeriodNanos).isEqualTo(2_000_000_000L); + assertThat(config.errorUtilizationPenalty).isEqualTo(1.75F); } @Test @@ -106,6 +108,7 @@ public void parseLoadBalancingConfigDefaultValues() throws IOException { assertThat(config.weightExpirationPeriodNanos).isEqualTo(180_000_000_000L); assertThat(config.enableOobLoadReport).isEqualTo(false); assertThat(config.weightUpdatePeriodNanos).isEqualTo(100_000_000L); + assertThat(config.errorUtilizationPenalty).isEqualTo(1.0F); } diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java index 66d3211fe7c..304b7a033f5 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java @@ -339,6 +339,23 @@ public void pickByWeight_LargeWeight() { 100 / 0.86 / totalWeight); } + @Test + public void pickByWeight_largeWeight_withEps_defaultErrorUtilizationPenalty() { + MetricReport report1 = InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, 999, 13, new HashMap<>(), new HashMap<>()); + MetricReport report2 = InternalCallMetricRecorder.createMetricReport( + 0.9, 0.1, 2, 1.8, new HashMap<>(), new HashMap<>()); + MetricReport report3 = InternalCallMetricRecorder.createMetricReport( + 0.86, 0.1, 100, 3, new HashMap<>(), new HashMap<>()); + double weight1 = 999 / (0.1 + 13 / 999F * weightedConfig.errorUtilizationPenalty); + double weight2 = 2 / (0.9 + 1.8 / 2F * weightedConfig.errorUtilizationPenalty); + double weight3 = 100 / (0.86 + 3 / 100F * weightedConfig.errorUtilizationPenalty); + double totalWeight = weight1 + weight2 + weight3; + + pickByWeight(report1, report2, report3, weight1 / totalWeight, weight2 / totalWeight, + weight3 / totalWeight); + } + @Test public void pickByWeight_normalWeight() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( @@ -353,6 +370,43 @@ public void pickByWeight_normalWeight() { ); } + @Test + public void pickByWeight_normalWeight_withEps_defaultErrorUtilizationPenalty() { + MetricReport report1 = InternalCallMetricRecorder.createMetricReport( + 0.12, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>()); + MetricReport report2 = InternalCallMetricRecorder.createMetricReport( + 0.28, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>()); + MetricReport report3 = InternalCallMetricRecorder.createMetricReport( + 0.86, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>()); + double weight1 = 22 / (0.12 + 19.7 / 22F * weightedConfig.errorUtilizationPenalty); + double weight2 = 40 / (0.28 + 0.998 / 40F * weightedConfig.errorUtilizationPenalty); + double weight3 = 100 / (0.86 + 3.14159 / 100F * weightedConfig.errorUtilizationPenalty); + double totalWeight = weight1 + weight2 + weight3; + + pickByWeight(report1, report2, report3, weight1 / totalWeight, weight2 / totalWeight, + weight3 / totalWeight); + } + + @Test + public void pickByWeight_normalWeight_withEps_customErrorUtilizationPenalty() { + weightedConfig = WeightedRoundRobinLoadBalancerConfig.newBuilder() + .setErrorUtilizationPenalty(1.75F).build(); + + MetricReport report1 = InternalCallMetricRecorder.createMetricReport( + 0.12, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>()); + MetricReport report2 = InternalCallMetricRecorder.createMetricReport( + 0.28, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>()); + MetricReport report3 = InternalCallMetricRecorder.createMetricReport( + 0.86, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>()); + double weight1 = 22 / (0.12 + 19.7 / 22F * weightedConfig.errorUtilizationPenalty); + double weight2 = 40 / (0.28 + 0.998 / 40F * weightedConfig.errorUtilizationPenalty); + double weight3 = 100 / (0.86 + 3.14159 / 100F * weightedConfig.errorUtilizationPenalty); + double totalWeight = weight1 + weight2 + weight3; + + pickByWeight(report1, report2, report3, weight1 / totalWeight, weight2 / totalWeight, + weight3 / totalWeight); + } + @Test public void emptyConfig() { assertThat(wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplDataTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplDataTest.java index 90a7e4a58d7..f26f1eba879 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplDataTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplDataTest.java @@ -28,6 +28,7 @@ import com.google.protobuf.BoolValue; import com.google.protobuf.ByteString; import com.google.protobuf.Duration; +import com.google.protobuf.FloatValue; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.google.protobuf.StringValue; @@ -2003,6 +2004,8 @@ public void parseCluster_WrrLbPolicy_defaultLbConfig() throws ResourceInvalidExc .setBlackoutPeriod(Duration.newBuilder().setSeconds(17).build()) .setEnableOobLoadReport( BoolValue.newBuilder().setValue(true).build()) + .setErrorUtilizationPenalty( + FloatValue.newBuilder().setValue(1.75F).build()) .build())) .build()) .build()) @@ -2046,6 +2049,7 @@ public void parseCluster_WrrLbPolicy_defaultLbConfig() throws ResourceInvalidExc assertThat(result.oobReportingPeriodNanos).isEqualTo(10_000_000_000L); assertThat(result.weightUpdatePeriodNanos).isEqualTo(1_000_000_000L); assertThat(result.weightExpirationPeriodNanos).isEqualTo(180_000_000_000L); + assertThat(result.errorUtilizationPenalty).isEqualTo(1.75F); } @Test From 90810e877f9f9822dbbdbeb6bf7f57c36ef1210e Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 15 May 2023 22:04:38 +0000 Subject: [PATCH 4/9] update tests --- core/src/main/java/io/grpc/internal/JsonUtil.java | 4 ++-- core/src/test/java/io/grpc/internal/JsonUtilTest.java | 11 ++++++++++- .../java/io/grpc/services/CallMetricRecorderTest.java | 1 + 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/JsonUtil.java b/core/src/main/java/io/grpc/internal/JsonUtil.java index bc16cbaed41..f71ba218916 100644 --- a/core/src/main/java/io/grpc/internal/JsonUtil.java +++ b/core/src/main/java/io/grpc/internal/JsonUtil.java @@ -140,11 +140,11 @@ public static Float getNumberAsFloat(Map obj, String key) { return Float.parseFloat((String) value); } catch (NumberFormatException e) { throw new IllegalArgumentException( - String.format("value '%s' for key '%s' is not a float", value, key)); + String.format("string value '%s' for key '%s' cannot be parsed as a float", value, key)); } } throw new IllegalArgumentException( - String.format("value '%s' for key '%s' in '%s' is not a number", value, key, obj)); + String.format("value %s for key '%s' is not a float", value, key)); } /** diff --git a/core/src/test/java/io/grpc/internal/JsonUtilTest.java b/core/src/test/java/io/grpc/internal/JsonUtilTest.java index 6b7647a53db..058171814ea 100644 --- a/core/src/test/java/io/grpc/internal/JsonUtilTest.java +++ b/core/src/test/java/io/grpc/internal/JsonUtilTest.java @@ -43,6 +43,7 @@ public void getNumber() { map.put("key_string_minus_infinity", "-Infinity"); map.put("key_string_exponent", "2.998e8"); map.put("key_string_minus_zero", "-0"); + map.put("key_string_boolean", true); assertThat(JsonUtil.getNumberAsDouble(map, "key_number_1")).isEqualTo(1D); assertThat(JsonUtil.getNumberAsInteger(map, "key_number_1")).isEqualTo(1); @@ -127,7 +128,7 @@ public void getNumber() { fail("expecting to throw but did not"); } catch (RuntimeException e) { assertThat(e).hasMessageThat().isEqualTo( - "value 'six' for key 'key_string_six' is not a float"); + "string value 'six' for key 'key_string_six' cannot be parsed as a float"); } assertThat(JsonUtil.getNumberAsFloat(map, "key_number_7")).isEqualTo(7F); @@ -149,6 +150,14 @@ public void getNumber() { assertThat(JsonUtil.getNumberAsInteger(map, "key_nonexistent")).isNull(); assertThat(JsonUtil.getNumberAsLong(map, "key_nonexistent")).isNull(); assertThat(JsonUtil.getNumberAsFloat(map, "key_nonexistent")).isNull(); + + try { + JsonUtil.getNumberAsFloat(map, "key_string_boolean"); + fail("expecting to throw but did not"); + } catch (RuntimeException e) { + assertThat(e).hasMessageThat().isEqualTo( + "value true for key 'key_string_boolean' is not a float"); + } } @Test diff --git a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java index 6893af7cd9b..439cd7628a3 100644 --- a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java +++ b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java @@ -58,6 +58,7 @@ public void dumpDumpsAllSavedMetricValues() { Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0.474); Truth.assertThat(dump.getQps()).isEqualTo(2522.54); Truth.assertThat(dump.getEps()).isEqualTo(1.618); + Truth.assertThat(dump.toString()).contains("eps=1.618"); } @Test From 0e2f7e6ffb0a6d4f983530f48f70258ccca89442 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 15 May 2023 22:14:30 +0000 Subject: [PATCH 5/9] fix checkStyleMain error --- core/src/main/java/io/grpc/internal/JsonUtil.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/internal/JsonUtil.java b/core/src/main/java/io/grpc/internal/JsonUtil.java index f71ba218916..44cb22abda5 100644 --- a/core/src/main/java/io/grpc/internal/JsonUtil.java +++ b/core/src/main/java/io/grpc/internal/JsonUtil.java @@ -140,7 +140,8 @@ public static Float getNumberAsFloat(Map obj, String key) { return Float.parseFloat((String) value); } catch (NumberFormatException e) { throw new IllegalArgumentException( - String.format("string value '%s' for key '%s' cannot be parsed as a float", value, key)); + String.format("string value '%s' for key '%s' cannot be parsed as a float", value, + key)); } } throw new IllegalArgumentException( From 5852e70ab95d148e865b3285d48d6f229fc014f7 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Wed, 24 May 2023 17:05:38 +0000 Subject: [PATCH 6/9] pass EUP to picker, allocate new listener each RPC --- .../xds/WeightedRoundRobinLoadBalancer.java | 64 +++++++++------ .../WeightedRoundRobinLoadBalancerTest.java | 79 ++++++++++++------- 2 files changed, 90 insertions(+), 53 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java index 984db2a0771..10b5c4288f8 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java @@ -110,7 +110,8 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { @Override public RoundRobinPicker createReadyPicker(List activeList) { - return new WeightedRoundRobinPicker(activeList, config.enableOobLoadReport); + return new WeightedRoundRobinPicker(activeList, config.enableOobLoadReport, + config.errorUtilizationPenalty); } private final class UpdateWeightTask implements Runnable { @@ -172,8 +173,8 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) { @VisibleForTesting final class WrrSubchannel extends ForwardingSubchannel { private final Subchannel delegate; - private final OrcaOobReportListener oobListener = this::onLoadReport; - private final OrcaPerRequestReportListener perRpcListener = this::onLoadReport; + private final OrcaOobReportListener oobListener = new OrcaReportListener( + config.errorUtilizationPenalty); private volatile long lastUpdated; private volatile long nonEmptySince; private volatile double weight; @@ -182,26 +183,6 @@ final class WrrSubchannel extends ForwardingSubchannel { this.delegate = checkNotNull(delegate, "delegate"); } - @VisibleForTesting - void onLoadReport(MetricReport report) { - double newWeight = 0; - if (report.getCpuUtilization() > 0 && report.getQps() > 0) { - double penalty = 0; - if (report.getEps() > 0 && config.errorUtilizationPenalty > 0) { - penalty = report.getEps() / report.getQps() * config.errorUtilizationPenalty; - } - newWeight = report.getQps() / (report.getCpuUtilization() + penalty); - } - if (newWeight == 0) { - return; - } - if (nonEmptySince == infTime) { - nonEmptySince = ticker.nanoTime(); - } - lastUpdated = ticker.nanoTime(); - weight = newWeight; - } - @Override public void start(SubchannelStateListener listener) { delegate().start(new SubchannelStateListener() { @@ -235,19 +216,50 @@ private double getWeight() { protected Subchannel delegate() { return delegate; } + + final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener { + private final float errorUtilizationPenalty; + + OrcaReportListener(float errorUtilizationPenalty) { + this.errorUtilizationPenalty = errorUtilizationPenalty; + } + + @Override + public void onLoadReport(MetricReport report) { + double newWeight = 0; + if (report.getCpuUtilization() > 0 && report.getQps() > 0) { + double penalty = 0; + if (report.getEps() > 0 && errorUtilizationPenalty > 0) { + penalty = report.getEps() / report.getQps() * errorUtilizationPenalty; + } + newWeight = report.getQps() / (report.getCpuUtilization() + penalty); + } + if (newWeight == 0) { + return; + } + if (nonEmptySince == infTime) { + nonEmptySince = ticker.nanoTime(); + } + lastUpdated = ticker.nanoTime(); + weight = newWeight; + } + } } @VisibleForTesting final class WeightedRoundRobinPicker extends RoundRobinPicker { private final List list; private final boolean enableOobLoadReport; + private final float errorUtilizationPenalty; private volatile EdfScheduler scheduler; - WeightedRoundRobinPicker(List list, boolean enableOobLoadReport) { + WeightedRoundRobinPicker(List list, boolean enableOobLoadReport, + float errorUtilizationPenalty) { checkNotNull(list, "list"); Preconditions.checkArgument(!list.isEmpty(), "empty list"); this.list = list; this.enableOobLoadReport = enableOobLoadReport; + this.errorUtilizationPenalty = errorUtilizationPenalty; updateWeight(); } @@ -257,7 +269,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { if (!enableOobLoadReport) { return PickResult.withSubchannel(subchannel, OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory( - ((WrrSubchannel)subchannel).perRpcListener)); + ((WrrSubchannel)subchannel).new OrcaReportListener(errorUtilizationPenalty))); } else { return PickResult.withSubchannel(subchannel); } @@ -291,6 +303,7 @@ private void updateWeight() { public String toString() { return MoreObjects.toStringHelper(WeightedRoundRobinPicker.class) .add("enableOobLoadReport", enableOobLoadReport) + .add("errorUtilizationPenalty", errorUtilizationPenalty) .add("list", list).toString(); } @@ -310,6 +323,7 @@ public boolean isEquivalentTo(RoundRobinPicker picker) { } // the lists cannot contain duplicate subchannels return enableOobLoadReport == other.enableOobLoadReport + && Float.compare(errorUtilizationPenalty, other.errorUtilizationPenalty) == 0 && list.size() == other.list.size() && new HashSet<>(list).containsAll(other.list); } } diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java index 304b7a033f5..e8d591500b8 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java @@ -207,9 +207,11 @@ public void wrrLifeCycle() { assertThat(weightedPicker.getList().size()).isEqualTo(2); WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) @@ -251,9 +253,11 @@ public void enableOobLoadReportConfig() { (WeightedRoundRobinPicker) pickerCaptor.getAllValues().get(1); WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( 0.9, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); PickResult pickResult = weightedPicker.pickSubchannel(mockArgs); @@ -307,9 +311,12 @@ private void pickByWeight(MetricReport r1, MetricReport r2, MetricReport r3, WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); WrrSubchannel weightedSubchannel3 = (WrrSubchannel) weightedPicker.getList().get(2); - weightedSubchannel1.onLoadReport(r1); - weightedSubchannel2.onLoadReport(r2); - weightedSubchannel3.onLoadReport(r3); + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + r1); + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + r2); + weightedSubchannel3.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + r3); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); for (int i = 0; i < 10000; i++) { @@ -449,9 +456,11 @@ public void blackoutPeriod() { (WeightedRoundRobinPicker) pickerCaptor.getAllValues().get(1); WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(5, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); @@ -507,10 +516,12 @@ public void updateWeightTimer() { assertThat(weightedPicker.getList().size()).isEqualTo(2); WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) .getSubchannel()).isEqualTo(weightedSubchannel1); @@ -522,10 +533,12 @@ public void updateWeightTimer() { .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); //timer fires, new weight updated assertThat(fakeClock.forwardTime(500, TimeUnit.MILLISECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) @@ -554,9 +567,11 @@ public void weightExpired() { (WeightedRoundRobinPicker) pickerCaptor.getAllValues().get(1); WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); @@ -616,8 +631,10 @@ public void rrFallback() { pickCount.getOrDefault(pickResult.getSubchannel(), 0) + 1); assertThat(pickResult.getStreamTracerFactory()).isNotNull(); WrrSubchannel subchannel = (WrrSubchannel)pickResult.getSubchannel(); - subchannel.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, qpsByChannel.get(subchannel), 0, new HashMap<>(), new HashMap<>())); + subchannel.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, qpsByChannel.get(subchannel), 0, + new HashMap<>(), new HashMap<>())); } assertThat(Math.abs(pickCount.get(weightedSubchannel1) / 1000.0 - 1.0 / 2)) .isAtMost(0.1); @@ -629,9 +646,11 @@ public void rrFallback() { pickCount.put(pickResult.getSubchannel(), pickCount.getOrDefault(pickResult.getSubchannel(), 0) + 1); assertThat(pickResult.getStreamTracerFactory()).isNotNull(); - WrrSubchannel subchannel = (WrrSubchannel)pickResult.getSubchannel(); - subchannel.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, qpsByChannel.get(subchannel), 0, new HashMap<>(), new HashMap<>())); + WrrSubchannel subchannel = (WrrSubchannel) pickResult.getSubchannel(); + subchannel.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, qpsByChannel.get(subchannel), 0, + new HashMap<>(), new HashMap<>())); fakeClock.forwardTime(50, TimeUnit.MILLISECONDS); } assertThat(pickCount.size()).isEqualTo(2); @@ -667,9 +686,11 @@ public void unknownWeightIsAvgWeight() { WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); WrrSubchannel weightedSubchannel3 = (WrrSubchannel) weightedPicker.getList().get(2); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); @@ -709,9 +730,11 @@ public void pickFromOtherThread() throws Exception { (WeightedRoundRobinPicker) pickerCaptor.getAllValues().get(1); WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); CyclicBarrier barrier = new CyclicBarrier(2); Map pickCount = new ConcurrentHashMap<>(); From d4529b51a6bba65695870a59ebb295ff69226f84 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Wed, 24 May 2023 18:34:41 +0000 Subject: [PATCH 7/9] add WRR tests & improve code coverage --- .../WeightedRoundRobinLoadBalancerTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java index e8d591500b8..011bdbb3427 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java @@ -205,6 +205,11 @@ public void wrrLifeCycle() { assertThat(weightedPicker.getList().size()).isEqualTo(1); weightedPicker = (WeightedRoundRobinPicker) pickerCaptor.getAllValues().get(1); assertThat(weightedPicker.getList().size()).isEqualTo(2); + String weightedPickerStr = weightedPicker.toString(); + assertThat(weightedPickerStr).contains("enableOobLoadReport=false"); + assertThat(weightedPickerStr).contains("errorUtilizationPenalty=1.0"); + assertThat(weightedPickerStr).contains("list="); + WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( @@ -414,6 +419,23 @@ public void pickByWeight_normalWeight_withEps_customErrorUtilizationPenalty() { weight3 / totalWeight); } + @Test + public void pickByWeight_avgWeight_zeroCpuUtilization_withEps_customErrorUtilizationPenalty() { + weightedConfig = WeightedRoundRobinLoadBalancerConfig.newBuilder() + .setErrorUtilizationPenalty(1.75F).build(); + + MetricReport report1 = InternalCallMetricRecorder.createMetricReport( + 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>()); + MetricReport report2 = InternalCallMetricRecorder.createMetricReport( + 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>()); + MetricReport report3 = InternalCallMetricRecorder.createMetricReport( + 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>()); + double avgSubchannelPickRatio = 1.0 / 3; + + pickByWeight(report1, report2, report3, avgSubchannelPickRatio, avgSubchannelPickRatio, + avgSubchannelPickRatio); + } + @Test public void emptyConfig() { assertThat(wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() From 8e929f39045f8dc843f3c0f7ec16fa5983c15e8a Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Thu, 25 May 2023 22:26:02 +0000 Subject: [PATCH 8/9] create a new OOB listener each time --- .../java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java index 10b5c4288f8..e13603912c0 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java @@ -129,7 +129,8 @@ private void afterAcceptAddresses() { for (Subchannel subchannel : getSubchannels()) { WrrSubchannel weightedSubchannel = (WrrSubchannel) subchannel; if (config.enableOobLoadReport) { - OrcaOobUtil.setListener(weightedSubchannel, weightedSubchannel.oobListener, + OrcaOobUtil.setListener(weightedSubchannel, + weightedSubchannel.new OrcaReportListener(config.errorUtilizationPenalty), OrcaOobUtil.OrcaReportingConfig.newBuilder() .setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS) .build()); @@ -173,8 +174,6 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) { @VisibleForTesting final class WrrSubchannel extends ForwardingSubchannel { private final Subchannel delegate; - private final OrcaOobReportListener oobListener = new OrcaReportListener( - config.errorUtilizationPenalty); private volatile long lastUpdated; private volatile long nonEmptySince; private volatile double weight; From 07d7564c6feeef2e65e14cb37600ba02d68b8b72 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Fri, 26 May 2023 00:00:13 +0000 Subject: [PATCH 9/9] per-request listener created once per channel in picker's constructor & fallback if not found --- .../io/grpc/xds/WeightedRoundRobinLoadBalancer.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java index e13603912c0..cf54eb4bb17 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java @@ -40,8 +40,10 @@ import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener; import io.grpc.xds.orca.OrcaPerRequestUtil; import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.PriorityQueue; import java.util.Random; import java.util.concurrent.ScheduledExecutorService; @@ -248,6 +250,8 @@ public void onLoadReport(MetricReport report) { @VisibleForTesting final class WeightedRoundRobinPicker extends RoundRobinPicker { private final List list; + private final Map subchannelToReportListenerMap = + new HashMap<>(); private final boolean enableOobLoadReport; private final float errorUtilizationPenalty; private volatile EdfScheduler scheduler; @@ -257,6 +261,10 @@ final class WeightedRoundRobinPicker extends RoundRobinPicker { checkNotNull(list, "list"); Preconditions.checkArgument(!list.isEmpty(), "empty list"); this.list = list; + for (Subchannel subchannel : list) { + this.subchannelToReportListenerMap.put(subchannel, + ((WrrSubchannel) subchannel).new OrcaReportListener(errorUtilizationPenalty)); + } this.enableOobLoadReport = enableOobLoadReport; this.errorUtilizationPenalty = errorUtilizationPenalty; updateWeight(); @@ -268,7 +276,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { if (!enableOobLoadReport) { return PickResult.withSubchannel(subchannel, OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory( - ((WrrSubchannel)subchannel).new OrcaReportListener(errorUtilizationPenalty))); + subchannelToReportListenerMap.getOrDefault(subchannel, + ((WrrSubchannel) subchannel).new OrcaReportListener(errorUtilizationPenalty)))); } else { return PickResult.withSubchannel(subchannel); }