diff --git a/api/src/main/java/io/grpc/MetricInstrumentRegistry.java b/api/src/main/java/io/grpc/MetricInstrumentRegistry.java index 60a3d1edfad..0eda6f2d4d0 100644 --- a/api/src/main/java/io/grpc/MetricInstrumentRegistry.java +++ b/api/src/main/java/io/grpc/MetricInstrumentRegistry.java @@ -16,7 +16,6 @@ package io.grpc; -import com.google.common.annotations.VisibleForTesting; import java.util.Collections; import java.util.List; import java.util.Set; @@ -47,16 +46,6 @@ public static synchronized MetricInstrumentRegistry getDefaultRegistry() { return instance; } - /** - * Allows the registry to be reset from unit tests. - */ - @VisibleForTesting - public static synchronized void reset() { - if (instance != null) { - instance = new MetricInstrumentRegistry(); - } - } - /** * Returns a list of registered metric instruments. */ diff --git a/api/src/testFixtures/java/io/grpc/FakeMetricRecorder.java b/api/src/testFixtures/java/io/grpc/FakeMetricRecorder.java deleted file mode 100644 index 4f7bd0e9aa0..00000000000 --- a/api/src/testFixtures/java/io/grpc/FakeMetricRecorder.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright 2024 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.Maps; -import java.util.List; -import java.util.Map; - -/** - * A fake implementation of the {@link MetricRecorder} that collects all metrics in memory to allow - * tests to make assertions against the collected metrics. - */ -public class FakeMetricRecorder implements MetricRecorder { - public Map> doubleCounterEntries = Maps.newHashMap(); - public Map> longCounterEntries = Maps.newHashMap(); - public Map> doubleHistogramCounterEntries = Maps.newHashMap(); - public Map> longHistogramCounterEntries = Maps.newHashMap(); - - - @Override - public void recordDoubleCounter(DoubleCounterMetricInstrument metricInstrument, double value, - List requiredLabelValues, List optionalLabelValues) { - doubleCounterEntries.put(metricInstrument.getName(), - new MetricEntry<>(value, requiredLabelValues, optionalLabelValues)); - } - - @Override - public void recordLongCounter(LongCounterMetricInstrument metricInstrument, long value, - List requiredLabelValues, List optionalLabelValues) { - longCounterEntries.put(metricInstrument.getName(), - new MetricEntry<>(value, requiredLabelValues, optionalLabelValues)); - } - - @Override - public void recordDoubleHistogram(DoubleHistogramMetricInstrument metricInstrument, - double value, List requiredLabelValues, List optionalLabelValues) { - doubleHistogramCounterEntries.put(metricInstrument.getName(), - new MetricEntry<>(value, requiredLabelValues, optionalLabelValues)); - } - - @Override - public void recordLongHistogram(LongHistogramMetricInstrument metricInstrument, long value, - List requiredLabelValues, List optionalLabelValues) { - longHistogramCounterEntries.put(metricInstrument.getName(), - new MetricEntry<>(value, requiredLabelValues, optionalLabelValues)); - } - - public Long getLongCounterValue(String metricName) { - return longCounterEntries.get(metricName).value; - } - - // Returns the last recorded double histogram value. - public Double getDoubleHistogramValue(String metricName) { - return doubleHistogramCounterEntries.get(metricName).value; - } - - public boolean hasLongCounterValue(String metricName) { - return longCounterEntries.containsKey(metricName); - } - - public void clear() { - doubleCounterEntries.clear(); - longCounterEntries.clear(); - doubleHistogramCounterEntries.clear(); - longHistogramCounterEntries.clear(); - } - - public static class MetricEntry { - public T value; - public List requiredLabelValues; - public List optionalLabelValues; - - public MetricEntry(T value, List requiredValues, List optionalValues) { - this.value = value; - this.requiredLabelValues = requiredValues; - this.optionalLabelValues = optionalValues; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this).add("value", value) - .add("requiredLabelValues", requiredLabelValues) - .add("optionalLabelValues", optionalLabelValues).toString(); - } - } -} diff --git a/core/src/main/java/io/grpc/internal/MetricLongCounter.java b/core/src/main/java/io/grpc/internal/MetricLongCounter.java deleted file mode 100644 index 6df331c94e1..00000000000 --- a/core/src/main/java/io/grpc/internal/MetricLongCounter.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2024 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.internal; - -import io.grpc.LongCounterMetricInstrument; -import io.grpc.MetricRecorder; -import java.util.List; - -/** - * Maintains a counter while also recording new values with a {@link MetricRecorder}. - */ -public class MetricLongCounter { - - private final MetricRecorder recorder; - private final LongCounterMetricInstrument instrument; - private final LongCounter longCounter; - - /** - * Creates a new {@code MetricLongCounter} for a given recorder and instrument. - */ - public MetricLongCounter(MetricRecorder recorder, LongCounterMetricInstrument instrument) { - this.recorder = recorder; - this.instrument = instrument; - this.longCounter = LongCounterFactory.create(); - } - - /** - * Updates the counter with the given delta value and records the change with the - * {@link MetricRecorder}. - */ - public void add(long delta, List requiredLabelValues, List optionalLabelValues) { - longCounter.add(delta); - recorder.recordLongCounter(instrument, longCounter.value(), requiredLabelValues, - optionalLabelValues); - } - - /** - * Increments the counter by one and records the change with the {@link MetricRecorder}. - */ - public void increment(List requiredLabelValues, List optionalLabelValues) { - add(1, requiredLabelValues, optionalLabelValues); - } - - /** - * Returns the current value of the counter. - */ - public long value() { - return longCounter.value(); - } -} diff --git a/core/src/test/java/io/grpc/internal/MetricLongCounterTest.java b/core/src/test/java/io/grpc/internal/MetricLongCounterTest.java deleted file mode 100644 index 1eba45cf80d..00000000000 --- a/core/src/test/java/io/grpc/internal/MetricLongCounterTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2024 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.internal; - -import static com.google.common.truth.Truth.assertThat; -import static org.mockito.Mockito.verify; - -import com.google.common.collect.Lists; -import io.grpc.LongCounterMetricInstrument; -import io.grpc.MetricInstrumentRegistry; -import io.grpc.MetricRecorder; -import java.util.List; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; - -@RunWith(JUnit4.class) -public class MetricLongCounterTest { - - @Rule - public MockitoRule mockitoRule = MockitoJUnit.rule(); - - @Mock - MetricRecorder mockMetricRecorder; - - @Test - public void incrementAndAdd() { - LongCounterMetricInstrument instrument = MetricInstrumentRegistry.getDefaultRegistry() - .registerLongCounter("long", - "description", "unit", Lists.newArrayList(), Lists.newArrayList(), true); - List requiredLabelValues = Lists.newArrayList(); - List optionalLabelValues = Lists.newArrayList(); - MetricLongCounter counter = new MetricLongCounter(mockMetricRecorder, instrument); - - counter.increment(requiredLabelValues, optionalLabelValues); - verify(mockMetricRecorder).recordLongCounter(instrument, 1, requiredLabelValues, - optionalLabelValues); - - counter.increment(requiredLabelValues, optionalLabelValues); - verify(mockMetricRecorder).recordLongCounter(instrument, 2, requiredLabelValues, - optionalLabelValues); - - counter.add(2L, requiredLabelValues, optionalLabelValues); - verify(mockMetricRecorder).recordLongCounter(instrument, 4, requiredLabelValues, - optionalLabelValues); - - counter.add(3L, requiredLabelValues, optionalLabelValues); - verify(mockMetricRecorder).recordLongCounter(instrument, 7, requiredLabelValues, - optionalLabelValues); - - counter.add(5L, requiredLabelValues, optionalLabelValues); - verify(mockMetricRecorder).recordLongCounter(instrument, 12, requiredLabelValues, - optionalLabelValues); - - assertThat(counter.value()).isEqualTo(12); - } -} \ No newline at end of file diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java index c88b213b242..a38b1488f34 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java @@ -33,12 +33,12 @@ import io.grpc.ExperimentalApi; import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; +import io.grpc.LongCounterMetricInstrument; import io.grpc.MetricInstrumentRegistry; import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; -import io.grpc.internal.MetricLongCounter; import io.grpc.services.MetricReport; import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.ForwardingSubchannel; @@ -78,10 +78,34 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer { private final AtomicInteger sequence; private final long infTime; private final Ticker ticker; - private final MetricLongCounter rrFallbackCounter; - private final MetricLongCounter endpointWeightNotYetUseableCounter; - private final MetricLongCounter endpointWeightStaleCounter; - private final DoubleHistogramMetricInstrument endpointWeightsHistogram; + private static final LongCounterMetricInstrument rrFallbackCounter; + private static final LongCounterMetricInstrument endpointWeightNotYetUseableCounter; + private static final LongCounterMetricInstrument endpointWeightStaleCounter; + private static final DoubleHistogramMetricInstrument endpointWeightsHistogram; + + // The metric instruments are only registered once and shared by all instances of this LB. + static { + MetricInstrumentRegistry metricInstrumentRegistry + = MetricInstrumentRegistry.getDefaultRegistry(); + rrFallbackCounter = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback", + "Number of scheduler updates in which there were not enough endpoints with valid " + + "weight, which caused the WRR policy to fall back to RR behavior", "update", + Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"), true); + endpointWeightNotYetUseableCounter = metricInstrumentRegistry.registerLongCounter( + "grpc.lb.wrr.endpoint_weight_not_yet_usable", + "Number of endpoints from each scheduler update that don't yet have usable weight " + + "information", "endpoint", Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.lb.locality"), true); + endpointWeightStaleCounter = metricInstrumentRegistry.registerLongCounter( + "grpc.lb.wrr.endpoint_weight_stale", + "Number of endpoints from each scheduler update whose latest weight is older than the " + + "expiration period", "endpoint", Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.lb.locality"), true); + endpointWeightsHistogram = metricInstrumentRegistry.registerDoubleHistogram( + "grpc.lb.wrr.endpoint_weights", "The histogram buckets will be endpoint weight ranges.", + "weight", null, Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"), + true); + } public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) { this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, new Random()); @@ -97,27 +121,6 @@ public WeightedRoundRobinLoadBalancer(WrrHelper helper, Ticker ticker, Random ra this.updateWeightTask = new UpdateWeightTask(); this.sequence = new AtomicInteger(random.nextInt()); - MetricInstrumentRegistry metricInstrumentRegistry - = MetricInstrumentRegistry.getDefaultRegistry(); - this.rrFallbackCounter = new MetricLongCounter(helper.getMetricRecorder(), - metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback", - "Number of scheduler updates in which there were not enough endpoints with valid " - + "weight, which caused the WRR policy to fall back to RR behavior", "update", - Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"), true)); - this.endpointWeightNotYetUseableCounter = new MetricLongCounter(helper.getMetricRecorder(), - metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.endpoint_weight_not_yet_usable", - "Number of endpoints from each scheduler update that don't yet have usable weight " - + "information", "endpoint", Lists.newArrayList("grpc.target"), - Lists.newArrayList("grpc.lb.locality"), true)); - this.endpointWeightStaleCounter = new MetricLongCounter(helper.getMetricRecorder(), - metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.endpoint_weight_stale", - "Number of endpoints from each scheduler update whose latest weight is older than the " - + "expiration period", "endpoint", Lists.newArrayList("grpc.target"), - Lists.newArrayList("grpc.lb.locality"), true)); - this.endpointWeightsHistogram = metricInstrumentRegistry.registerDoubleHistogram( - "grpc.lb.wrr.endpoint_weights", - "The histogram buckets will be endpoint weight ranges.", "weight", null, - Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"), true); log.log(Level.FINE, "weighted_round_robin LB created"); } @@ -177,7 +180,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { @Override public SubchannelPicker createReadyPicker(Collection activeList) { return new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList), - config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, rrFallbackCounter); + config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, getHelper(), + rrFallbackCounter); } @VisibleForTesting @@ -195,37 +199,22 @@ public WeightedChildLbState(Object key, LoadBalancerProvider policyProvider, Obj super(key, policyProvider, childConfig, initialPicker); } - private double getWeight() { - double weightToReturn = 0; - if (config != null) { - long now = ticker.nanoTime(); - if (weightIsStale(now)) { - nonEmptySince = infTime; - // TODO: add target and locality labels once available - endpointWeightStaleCounter.add(1, ImmutableList.of(), ImmutableList.of()); - } else if (inBlackoutPeriod(now)) { - // Note that incrementing a counter here works as getWeight() is called once for each - // endpoint. If that ever changes, this counter can't be incremented here. - // TODO: add target and locality labels once available - endpointWeightNotYetUseableCounter.add(1, ImmutableList.of(), ImmutableList.of()); - } else { - // Weight is valid to use. - weightToReturn = weight; - } + private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsableEndpoints) { + if (config == null) { + return 0; + } + long now = ticker.nanoTime(); + if (now - lastUpdated >= config.weightExpirationPeriodNanos) { + nonEmptySince = infTime; + staleEndpoints.incrementAndGet(); + return 0; + } else if (now - nonEmptySince < config.blackoutPeriodNanos + && config.blackoutPeriodNanos > 0) { + notYetUsableEndpoints.incrementAndGet(); + return 0; + } else { + return weight; } - // TODO: add target and locality labels once available - getHelper().getMetricRecorder() - .recordDoubleHistogram(endpointWeightsHistogram, weightToReturn, ImmutableList.of(), - ImmutableList.of()); - return weightToReturn; - } - - private boolean weightIsStale(long now) { - return now - lastUpdated >= config.weightExpirationPeriodNanos; - } - - private boolean inBlackoutPeriod(long now) { - return now - nonEmptySince < config.blackoutPeriodNanos && config.blackoutPeriodNanos > 0; } public void addSubchannel(WrrSubchannel wrrSubchannel) { @@ -385,12 +374,13 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker { private final float errorUtilizationPenalty; private final AtomicInteger sequence; private final int hashCode; - private final MetricLongCounter rrFallbackCounter; + private final LoadBalancer.Helper helper; + private final LongCounterMetricInstrument rrFallbackCounter; private volatile StaticStrideScheduler scheduler; WeightedRoundRobinPicker(List children, boolean enableOobLoadReport, - float errorUtilizationPenalty, AtomicInteger sequence, - MetricLongCounter rrFallbackCounter) { + float errorUtilizationPenalty, AtomicInteger sequence, LoadBalancer.Helper helper, + LongCounterMetricInstrument rrFallbackCounter) { checkNotNull(children, "children"); Preconditions.checkArgument(!children.isEmpty(), "empty child list"); this.children = children; @@ -404,6 +394,7 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker { this.enableOobLoadReport = enableOobLoadReport; this.errorUtilizationPenalty = errorUtilizationPenalty; this.sequence = checkNotNull(sequence, "sequence"); + this.helper = helper; this.rrFallbackCounter = rrFallbackCounter; // For equality we treat children as a set; use hash code as defined by Set @@ -439,14 +430,36 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { private void updateWeight() { float[] newWeights = new float[children.size()]; + AtomicInteger staleEndpoints = new AtomicInteger(); + AtomicInteger notYetUsableEndpoints = new AtomicInteger(); for (int i = 0; i < children.size(); i++) { - double newWeight = ((WeightedChildLbState) children.get(i)).getWeight(); + double newWeight = ((WeightedChildLbState) children.get(i)).getWeight(staleEndpoints, + notYetUsableEndpoints); + // TODO: add target and locality labels once available + helper.getMetricRecorder() + .recordDoubleHistogram(endpointWeightsHistogram, newWeight, ImmutableList.of(""), + ImmutableList.of("")); newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f; } + if (staleEndpoints.get() > 0) { + // TODO: add target and locality labels once available + helper.getMetricRecorder() + .recordLongCounter(endpointWeightStaleCounter, staleEndpoints.get(), + ImmutableList.of(""), + ImmutableList.of("")); + } + if (notYetUsableEndpoints.get() > 0) { + // TODO: add target and locality labels once available + helper.getMetricRecorder() + .recordLongCounter(endpointWeightNotYetUseableCounter, notYetUsableEndpoints.get(), + ImmutableList.of(""), ImmutableList.of("")); + } + this.scheduler = new StaticStrideScheduler(newWeights, sequence); if (this.scheduler.usesRoundRobin()) { // TODO: add target and locality labels once available - rrFallbackCounter.add(1, ImmutableList.of(""), ImmutableList.of("")); + helper.getMetricRecorder() + .recordLongCounter(rrFallbackCounter, 1, ImmutableList.of(""), ImmutableList.of("")); } } diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java index 22f65c3699a..0913df6a808 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java @@ -19,6 +19,7 @@ import static com.google.common.truth.Truth.assertThat; import static io.grpc.ConnectivityState.CONNECTING; import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -40,8 +41,8 @@ import io.grpc.ClientCall; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; +import io.grpc.DoubleHistogramMetricInstrument; import io.grpc.EquivalentAddressGroup; -import io.grpc.FakeMetricRecorder; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.CreateSubchannelArgs; import io.grpc.LoadBalancer.Helper; @@ -50,7 +51,7 @@ import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelStateListener; -import io.grpc.MetricInstrumentRegistry; +import io.grpc.LongCounterMetricInstrument; import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.SynchronizationContext; @@ -85,6 +86,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; @@ -123,7 +125,8 @@ public class WeightedRoundRobinLoadBalancerTest { private final FakeClock fakeClock = new FakeClock(); - private final FakeMetricRecorder fakeMetricRecorder = new FakeMetricRecorder(); + @Mock + private MetricRecorder mockMetricRecorder; private WeightedRoundRobinLoadBalancerConfig weightedConfig = WeightedRoundRobinLoadBalancerConfig.newBuilder().build(); @@ -148,9 +151,6 @@ public WeightedRoundRobinLoadBalancerTest() { @Before public void setup() { - MetricInstrumentRegistry.reset(); - fakeMetricRecorder.clear(); - for (int i = 0; i < 3; i++) { SocketAddress addr = new FakeSocketAddress("server" + i); EquivalentAddressGroup eag = new EquivalentAddressGroup(addr); @@ -1132,10 +1132,12 @@ public void removingAddressShutsdownSubchannel() { @Test public void metrics() { + // Give WRR some valid addresses to work with. syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); + // Flip the three subchannels to READY state to initiate the WRR logic Iterator it = subchannels.values().iterator(); Subchannel readySubchannel1 = it.next(); getSubchannelStateListener(readySubchannel1).onSubchannelState(ConnectivityStateInfo @@ -1152,19 +1154,22 @@ public void metrics() { // WRR creates a picker that updates the weights for each of the child subchannels. This should // give us three "rr_fallback" metric events as we don't yet have any weights to do weighted // round-robin. - assertLongCounter("grpc.lb.wrr.rr_fallback", 3L); + verifyLongCounterRecord("grpc.lb.wrr.rr_fallback", 3, 1); - // Each time weights are updated, WRR will see if each subchannels weight is useable. We should - // see 6 (first one has one subchannel, second two, third one all three) "endpoint_weight_not_ - // yet_usable" metric events since we have not gotten any weights yet. - assertLongCounter("grpc.lb.wrr.endpoint_weight_not_yet_usable", 6L); + // We should also see six records of endpoint weights. They should all be for 0 as we don't yet + // have valid weights. + verifyDoubleHistogramRecord("grpc.lb.wrr.endpoint_weights", 6, 0); - // We should not yet be seeing any "endpoint_weight_stale" events since we have no weights. - assertLongCounter("grpc.lb.wrr.endpoint_weight_stale", null); + // We should not yet be seeing any "endpoint_weight_stale" events since we don't even have + // valid weights yet. + verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_stale", 0, 1); - // The endpoint weights histogram should have been reported a 0 value as there are not yet valid - // weights. - assertDoubleHistogramValue("grpc.lb.wrr.endpoint_weights", 0); + // Each time weights are updated, WRR will see if each subchannel weight is useable. As we have + // no weights yet, we should see three "endpoint_weight_not_yet_usable" metric events with the + // value increasing by one each time as all the endpoints come online. + verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1, 1); + verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1, 2); + verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1, 3); // Send each child LB state an ORCA update with some valid utilization/qps data so that weights // can be calculated. @@ -1182,47 +1187,80 @@ public void metrics() { InternalCallMetricRecorder.createMetricReport(0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); + // Let's reset the mock MetricsRecorder so that it's easier to verify what happened after the + // weights were updated + reset(mockMetricRecorder); + // We go forward in time past the default 10s blackout period before weights can be considered // for wrr. The eights would get updated as the default update interval is 1s. fakeClock.forwardTime(11, TimeUnit.SECONDS); // Since we have weights on all the child LB states, the weight update should not result in // further rr_fallback metric entries. - assertLongCounter("grpc.lb.wrr.rr_fallback", 3L); + verifyLongCounterRecord("grpc.lb.wrr.rr_fallback", 0, 1); // We should not see an increase to the earlier count of "endpoint_weight_not_yet_usable". - assertLongCounter("grpc.lb.wrr.endpoint_weight_not_yet_usable", 6L); + verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0, 1); // No endpoints should have gotten stale yet either. - assertLongCounter("grpc.lb.wrr.endpoint_weight_stale", null); + verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_stale", 0, 1); // Now with valid weights we should have seen the value in the endpoint weights histogram. - assertDoubleHistogramValue("grpc.lb.wrr.endpoint_weights", 10); + verifyDoubleHistogramRecord("grpc.lb.wrr.endpoint_weights", 3, 10); + + reset(mockMetricRecorder); // Weights become stale in three minutes. Let's move ahead in time by 3 minutes and make sure // we get metrics events for each endpoint. fakeClock.forwardTime(3, TimeUnit.MINUTES); - assertLongCounter("grpc.lb.wrr.endpoint_weight_stale", 3L); + verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_stale", 1, 3); - // Since the weights are now stale the update should have triggered a new rr_fallback event. - assertLongCounter("grpc.lb.wrr.rr_fallback", 4L); + // With the weights stale each three endpoints should report 0 weights. + verifyDoubleHistogramRecord("grpc.lb.wrr.endpoint_weights", 3, 0); - // No further weights-not-useable events should not occur, since we have received weights and + // Since the weights are now stale the update should have triggered an additional rr_fallback + // event. + verifyLongCounterRecord("grpc.lb.wrr.rr_fallback", 1, 1); + + // No further weights-not-useable events should occur, since we have received weights and // are out of the blackout. - assertLongCounter("grpc.lb.wrr.endpoint_weight_not_yet_usable", 6L); + verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0, 1); + + // All metric events should be accounted for. + verifyNoMoreInteractions(mockMetricRecorder); } - private void assertLongCounter(String metricName, Long expected) { - if (expected == null) { - assertThat(fakeMetricRecorder.hasLongCounterValue(metricName)).isFalse(); - return; - } - assertThat(fakeMetricRecorder.getLongCounterValue(metricName)).isEqualTo(expected); + // Verifies that the MetricRecorder has been called to record a long counter value of 1 for the + // given metric name, the given number of times + private void verifyLongCounterRecord(String name, int times, long value) { + verify(mockMetricRecorder, times(times)).recordLongCounter( + argThat(new ArgumentMatcher() { + @Override + public boolean matches(LongCounterMetricInstrument longCounterInstrument) { + return longCounterInstrument.getName().equals(name); + } + + public String toString() { + return name; + } + }), eq(value), eq(Lists.newArrayList("")), eq(Lists.newArrayList(""))); } - private void assertDoubleHistogramValue(String metricName, double expected) { - assertThat(fakeMetricRecorder.getDoubleHistogramValue(metricName)).isEqualTo(expected); + // Verifies that the MetricRecorder has been called to record a given double histogram value the + // given amount of times. + private void verifyDoubleHistogramRecord(String name, int times, double value) { + verify(mockMetricRecorder, times(times)).recordDoubleHistogram( + argThat(new ArgumentMatcher() { + @Override + public boolean matches(DoubleHistogramMetricInstrument doubleHistogramInstrument) { + return doubleHistogramInstrument.getName().equals(name); + } + + public String toString() { + return name; + } + }), eq(value), eq(Lists.newArrayList("")), eq(Lists.newArrayList(""))); } private int getNumFilteredPendingTasks() { @@ -1296,7 +1334,7 @@ public Map getSubchannelStateListeners() { @Override public MetricRecorder getMetricRecorder() { - return fakeMetricRecorder; + return mockMetricRecorder; } } }