From 49f555192da0a8c1c9b997ca334f4ca2537668c5 Mon Sep 17 00:00:00 2001 From: Terry Wilson Date: Wed, 13 Jul 2022 15:54:49 -0700 Subject: [PATCH] xds: cluster manager to delay picker updates (#9365) Do not perform picker updates while handling new addresses even if child LBs request it. Assure that a single picker update is done. --- .../grpc/xds/ClusterManagerLoadBalancer.java | 38 +++++++++++-------- .../xds/ClusterManagerLoadBalancerTest.java | 15 ++++++++ 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java index 0557f3a6a8c..85009874951 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java @@ -57,6 +57,8 @@ class ClusterManagerLoadBalancer extends LoadBalancer { private final SynchronizationContext syncContext; private final ScheduledExecutorService timeService; private final XdsLogger logger; + // Set to true if currently in the process of handling resolved addresses. + private boolean resolvingAddresses; ClusterManagerLoadBalancer(Helper helper) { this.helper = checkNotNull(helper, "helper"); @@ -69,6 +71,15 @@ class ClusterManagerLoadBalancer extends LoadBalancer { @Override public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + try { + resolvingAddresses = true; + handleResolvedAddressesInternal(resolvedAddresses); + } finally { + resolvingAddresses = false; + } + } + + public void handleResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) { logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); ClusterManagerConfig config = (ClusterManagerConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); @@ -251,21 +262,18 @@ private final class ChildLbStateHelper extends ForwardingLoadBalancerHelper { @Override public void updateBalancingState(final ConnectivityState newState, final SubchannelPicker newPicker) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (!childLbStates.containsKey(name)) { - return; - } - // Subchannel picker and state are saved, but will only be propagated to the channel - // when the child instance exits deactivated state. - currentState = newState; - currentPicker = newPicker; - if (!deactivated) { - updateOverallBalancingState(); - } - } - }); + // If we are already in the process of resolving addresses, the overall balancing state + // will be updated at the end of it, and we don't need to trigger that update here. + if (resolvingAddresses || !childLbStates.containsKey(name)) { + return; + } + // Subchannel picker and state are saved, but will only be propagated to the channel + // when the child instance exits deactivated state. + currentState = newState; + currentPicker = newPicker; + if (!deactivated) { + updateOverallBalancingState(); + } } @Override diff --git a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java index 5890f6f9abb..043c27e46d9 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; @@ -52,6 +53,7 @@ import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.testing.TestMethodDescriptors; import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig; +import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -249,6 +251,15 @@ public void handleNameResolutionError_notPropagateToDeactivatedChildLbs() { assertThat(childBalancer2.upstreamError.getDescription()).isEqualTo("unknown error"); } + @Test + public void noDuplicateOverallBalancingStateUpdate() { + deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b")); + + // The test child LBs would have triggered state updates, let's make sure the overall balancing + // state was only updated once. + verify(helper, times(1)).updateBalancingState(any(), any()); + } + private void deliverResolvedAddresses(final Map childPolicies) { clusterManagerLoadBalancer.handleResolvedAddresses( ResolvedAddresses.newBuilder() @@ -329,6 +340,10 @@ private final class FakeLoadBalancer extends LoadBalancer { @Override public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { config = resolvedAddresses.getLoadBalancingPolicyConfig(); + + // Update balancing state here so that concurrent child state changes can be easily tested. + // Most tests ignore this and trigger separate child LB updates. + helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL)); } @Override