diff --git a/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java b/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java index 1cd5045052a..9a18b238862 100644 --- a/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java @@ -86,14 +86,14 @@ public void handleResolvedAddresses(final ResolvedAddresses resolvedAddresses) { } else { childLbStates.get(actionName).reactivate(action.getProvider()); } + final LoadBalancer childLb = childLbStates.get(actionName).lb; syncContext.execute(new Runnable() { @Override public void run() { - childLbStates.get(actionName).lb - .handleResolvedAddresses( - resolvedAddresses.toBuilder() - .setLoadBalancingPolicyConfig(action.getConfig()) - .build()); + childLb.handleResolvedAddresses( + resolvedAddresses.toBuilder() + .setLoadBalancingPolicyConfig(action.getConfig()) + .build()); } }); } @@ -102,6 +102,7 @@ public void run() { for (String actionName : diff) { childLbStates.get(actionName).deactivate(); } + updateOverallBalancingState(); } @Override @@ -235,12 +236,11 @@ private final class RouteHelper extends ForwardingLoadBalancerHelper { @Override public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { - if (deactivated) { - return; - } currentState = newState; currentPicker = newPicker; - updateOverallBalancingState(); + if (!deactivated) { + updateOverallBalancingState(); + } } @Override diff --git a/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java index 54646c37ea3..f9bc47fc55e 100644 --- a/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java @@ -17,12 +17,18 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import io.grpc.CallOptions; import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; @@ -38,6 +44,7 @@ import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; import io.grpc.internal.PickSubchannelArgsImpl; @@ -51,14 +58,18 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -77,6 +88,8 @@ public void uncaughtException(Thread t, Throwable e) { @Mock private LoadBalancer.Helper helper; + @Captor + ArgumentCaptor pickerCaptor; private RouteMatch routeMatch1 = new RouteMatch( @@ -95,7 +108,8 @@ public void uncaughtException(Thread t, Throwable e) { new PathMatcher(null, "/", null), Collections.emptyList(), null); - private List childBalancers = new ArrayList<>(); + private final Map lbConfigInventory = new HashMap<>(); + private final List childBalancers = new ArrayList<>(); private LoadBalancer xdsRoutingLoadBalancer; @Before @@ -103,107 +117,228 @@ public void setUp() { MockitoAnnotations.initMocks(this); when(helper.getSynchronizationContext()).thenReturn(syncContext); when(helper.getScheduledExecutorService()).thenReturn(fakeClock.getScheduledExecutorService()); + lbConfigInventory.put("actionA", new Object()); + lbConfigInventory.put("actionB", new Object()); + lbConfigInventory.put("actionC", null); xdsRoutingLoadBalancer = new XdsRoutingLoadBalancer(helper); } - @Test - public void typicalWorkflow() { - Object childConfig1 = new Object(); - Object childConfig2 = new Object(); - PolicySelection policyA = - new PolicySelection(new FakeLoadBalancerProvider("policy_a"), null, childConfig1); - PolicySelection policyB = - new PolicySelection(new FakeLoadBalancerProvider("policy_b"), null, childConfig2); - PolicySelection policyC = - new PolicySelection(new FakeLoadBalancerProvider("policy_c"), null , null); - - XdsRoutingConfig config = - new XdsRoutingConfig( - Arrays.asList( - new Route(routeMatch1, "action_a"), - new Route(routeMatch2, "action_b"), - new Route(routeMatch3, "action_a")), - ImmutableMap.of("action_a", policyA, "action_b", policyB)); - xdsRoutingLoadBalancer - .handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(Collections.emptyList()) - .setLoadBalancingPolicyConfig(config) - .build()); + @After + public void tearDown() { + xdsRoutingLoadBalancer.shutdown(); + for (FakeLoadBalancer childLb : childBalancers) { + assertThat(childLb.shutdown).isTrue(); + } + } + @Test + public void handleResolvedAddressesUpdatesChannelPicker() { + deliverResolvedAddresses( + ImmutableMap.of( + new Route(routeMatch1, "actionA"), "policy_a", + new Route(routeMatch2, "actionB"), "policy_b")); + + verify(helper, atLeastOnce()).updateBalancingState( + eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + RouteMatchingSubchannelPicker picker = (RouteMatchingSubchannelPicker) pickerCaptor.getValue(); + assertThat(picker.routePickers).hasSize(2); + assertThat(picker.routePickers.get(routeMatch1).pickSubchannel(mock(PickSubchannelArgs.class))) + .isEqualTo(PickResult.withNoResult()); + assertThat(picker.routePickers.get(routeMatch2).pickSubchannel(mock(PickSubchannelArgs.class))) + .isEqualTo(PickResult.withNoResult()); assertThat(childBalancers).hasSize(2); FakeLoadBalancer childBalancer1 = childBalancers.get(0); FakeLoadBalancer childBalancer2 = childBalancers.get(1); assertThat(childBalancer1.name).isEqualTo("policy_a"); assertThat(childBalancer2.name).isEqualTo("policy_b"); - assertThat(childBalancer1.config).isEqualTo(childConfig1); - assertThat(childBalancer2.config).isEqualTo(childConfig2); - - // Receive an updated routing config. - config = - new XdsRoutingConfig( - Arrays.asList( - new Route(routeMatch1, "action_b"), - new Route(routeMatch2, "action_c"), - new Route(routeMatch3, "action_c")), - ImmutableMap.of("action_b", policyA, "action_c", policyC)); - xdsRoutingLoadBalancer - .handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(Collections.emptyList()) - .setLoadBalancingPolicyConfig(config) - .build()); - - assertThat(childBalancer2.shutdown) - .isTrue(); // (immediate) shutdown because "action_b" changes policy (before ready) + assertThat(childBalancer1.config).isEqualTo(lbConfigInventory.get("actionA")); + assertThat(childBalancer2.config).isEqualTo(lbConfigInventory.get("actionB")); + + // Receive an updated config. + deliverResolvedAddresses( + ImmutableMap.of( + new Route(routeMatch1, "actionA"), "policy_a", + new Route(routeMatch3, "actionC"), "policy_c")); + + verify(helper, atLeast(2)) + .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + picker = (RouteMatchingSubchannelPicker) pickerCaptor.getValue(); + assertThat(picker.routePickers).hasSize(2); + assertThat(picker.routePickers).doesNotContainKey(routeMatch2); + assertThat(picker.routePickers.get(routeMatch3).pickSubchannel(mock(PickSubchannelArgs.class))) + .isEqualTo(PickResult.withNoResult()); assertThat(fakeClock.numPendingTasks()) - .isEqualTo(1); // (delayed) shutdown because "action_a" is removed + .isEqualTo(1); // (delayed) shutdown because "actionB" is removed assertThat(childBalancer1.shutdown).isFalse(); + assertThat(childBalancer2.shutdown).isFalse(); + assertThat(childBalancers).hasSize(3); - FakeLoadBalancer childBalancer3 = childBalancers.get(1); - FakeLoadBalancer childBalancer4 = childBalancers.get(2); - assertThat(childBalancer3.name).isEqualTo("policy_a"); - assertThat(childBalancer3).isNotSameInstanceAs(childBalancer1); - assertThat(childBalancer4.name).isEqualTo("policy_c"); + FakeLoadBalancer childBalancer3 = childBalancers.get(2); + assertThat(childBalancer3.name).isEqualTo("policy_c"); + assertThat(childBalancer3.config).isEqualTo(lbConfigInventory.get("actionC")); - // Simulate subchannel state update from the leaf policy. + fakeClock.forwardTime( + XdsRoutingLoadBalancer.DELAYED_ACTION_DELETION_TIME_MINUTES, TimeUnit.MINUTES); + assertThat(childBalancer2.shutdown).isTrue(); + } + + @Test + public void updateWithActionPolicyChange() { + deliverResolvedAddresses(ImmutableMap.of(new Route(routeMatch1, "actionA"), "policy_a")); + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + assertThat(childBalancer.name).isEqualTo("policy_a"); + assertThat(childBalancer.config).isEqualTo(lbConfigInventory.get("actionA")); + + deliverResolvedAddresses(ImmutableMap.of(new Route(routeMatch1, "actionA"), "policy_b")); + assertThat(childBalancer.shutdown).isTrue(); // immediate shutdown as the it was not ready + assertThat(Iterables.getOnlyElement(childBalancers).name).isEqualTo("policy_b"); + assertThat(Iterables.getOnlyElement(childBalancers).config) + .isEqualTo(lbConfigInventory.get("actionA")); + } + + @Test + public void updateBalancingStateFromChildBalancers() { + deliverResolvedAddresses( + ImmutableMap.of( + new Route(routeMatch1, "actionA"), "policy_a", + new Route(routeMatch2, "actionB"), "policy_b")); + + assertThat(childBalancers).hasSize(2); + FakeLoadBalancer childBalancer1 = childBalancers.get(0); + FakeLoadBalancer childBalancer2 = childBalancers.get(1); Subchannel subchannel1 = mock(Subchannel.class); Subchannel subchannel2 = mock(Subchannel.class); - Subchannel subchannel3 = mock(Subchannel.class); childBalancer1.deliverSubchannelState(subchannel1, ConnectivityState.READY); - childBalancer3.deliverSubchannelState(subchannel2, ConnectivityState.CONNECTING); - childBalancer4.deliverSubchannelState(subchannel3, ConnectivityState.READY); - ArgumentCaptor pickerCaptor = ArgumentCaptor.forClass(null); verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); RouteMatchingSubchannelPicker picker = (RouteMatchingSubchannelPicker) pickerCaptor.getValue(); - assertThat(picker.routePickers).hasSize(3); + assertThat(picker.routePickers).hasSize(2); assertThat( picker.routePickers.get(routeMatch1) .pickSubchannel(mock(PickSubchannelArgs.class)).getSubchannel()) - .isSameInstanceAs(subchannel2); // routeMatch1 -> action_b -> policy_a -> subchannel2 + .isEqualTo(subchannel1); + assertThat(picker.routePickers.get(routeMatch2).pickSubchannel(mock(PickSubchannelArgs.class))) + .isEqualTo(PickResult.withNoResult()); + + childBalancer2.deliverSubchannelState(subchannel2, ConnectivityState.READY); + verify(helper, times(2)) + .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + picker = (RouteMatchingSubchannelPicker) pickerCaptor.getValue(); assertThat( picker.routePickers.get(routeMatch2) .pickSubchannel(mock(PickSubchannelArgs.class)).getSubchannel()) - .isSameInstanceAs(subchannel3); // routeMatch2 -> action_c -> policy_c -> subchannel3 + .isEqualTo(subchannel2); + } + + @Test + public void updateBalancingStateFromDeactivatedChildBalancer() { + FakeLoadBalancer balancer = + deliverAddressesAndUpdateToRemoveChildPolicy( + new Route(routeMatch1, "actionA"), "policy_a"); + Subchannel subchannel = mock(Subchannel.class); + balancer.deliverSubchannelState(subchannel, ConnectivityState.READY); + verify(helper, never()).updateBalancingState( + eq(ConnectivityState.READY), any(SubchannelPicker.class)); + + deliverResolvedAddresses(ImmutableMap.of(new Route(routeMatch1, "actionA"), "policy_a")); + verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + RouteMatchingSubchannelPicker picker = (RouteMatchingSubchannelPicker) pickerCaptor.getValue(); assertThat( - picker.routePickers.get(routeMatch3) + picker.routePickers.get(routeMatch1) .pickSubchannel(mock(PickSubchannelArgs.class)).getSubchannel()) - .isSameInstanceAs(subchannel3); // routeMatch3 -> action_c -> policy_c -> subchannel3 + .isEqualTo(subchannel); + } - // Error propagation from upstream policies. - Status error = Status.UNAVAILABLE.withDescription("network error"); + @Test + public void errorPropagation() { + Status error = Status.UNAVAILABLE.withDescription("resolver error"); xdsRoutingLoadBalancer.handleNameResolutionError(error); - assertThat(childBalancer1.upstreamError).isNull(); - assertThat(childBalancer3.upstreamError).isEqualTo(error); - assertThat(childBalancer4.upstreamError).isEqualTo(error); - fakeClock.forwardTime( - XdsRoutingLoadBalancer.DELAYED_ACTION_DELETION_TIME_MINUTES, TimeUnit.MINUTES); - assertThat(childBalancer1.shutdown).isTrue(); + verify(helper).updateBalancingState( + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()).isEqualTo("resolver error"); - xdsRoutingLoadBalancer.shutdown(); - assertThat(childBalancer3.shutdown).isTrue(); - assertThat(childBalancer4.shutdown).isTrue(); + deliverResolvedAddresses( + ImmutableMap.of( + new Route(routeMatch1, "actionA"), "policy_a", + new Route(routeMatch2, "actionB"), "policy_b")); + + assertThat(childBalancers).hasSize(2); + FakeLoadBalancer childBalancer1 = childBalancers.get(0); + FakeLoadBalancer childBalancer2 = childBalancers.get(1); + + xdsRoutingLoadBalancer.handleNameResolutionError(error); + assertThat(childBalancer1.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(childBalancer1.upstreamError.getDescription()).isEqualTo("resolver error"); + assertThat(childBalancer2.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(childBalancer2.upstreamError.getDescription()).isEqualTo("resolver error"); + } + + @Test + public void errorPropagationToDeactivatedChildBalancer() { + FakeLoadBalancer balancer = + deliverAddressesAndUpdateToRemoveChildPolicy( + new Route(routeMatch1, "actionA"), "policy_a"); + xdsRoutingLoadBalancer.handleNameResolutionError( + Status.UNKNOWN.withDescription("unknown error")); + assertThat(balancer.upstreamError).isNull(); + } + + private FakeLoadBalancer deliverAddressesAndUpdateToRemoveChildPolicy( + Route route, String childPolicyName) { + lbConfigInventory.put("actionX", null); + Route routeX = + new Route( + new RouteMatch( + new PathMatcher( + "/XService/xMethod", null, null), + Collections.emptyList(), + null), + "actionX"); + deliverResolvedAddresses( + ImmutableMap.of(route, childPolicyName, routeX, "policy_x")); + + verify(helper, atLeastOnce()).updateBalancingState( + eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); + assertThat(childBalancers).hasSize(2); + FakeLoadBalancer balancer = childBalancers.get(0); + + deliverResolvedAddresses(ImmutableMap.of(routeX, "policy_x")); + verify(helper, atLeast(2)).updateBalancingState( + eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); + assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.MINUTES)) + .isEqualTo(XdsRoutingLoadBalancer.DELAYED_ACTION_DELETION_TIME_MINUTES); + return balancer; + } + + private void deliverResolvedAddresses(final Map childPolicies) { + syncContext.execute(new Runnable() { + @Override + public void run() { + xdsRoutingLoadBalancer + .handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setLoadBalancingPolicyConfig(buildConfig(childPolicies)) + .build()); + } + }); + } + + private XdsRoutingConfig buildConfig(Map childPolicies) { + Map childPolicySelections = new LinkedHashMap<>(); + List routeList = new ArrayList<>(); + for (Route route : childPolicies.keySet()) { + String childActionName = route.getActionName(); + String childPolicyName = childPolicies.get(route); + Object childConfig = lbConfigInventory.get(childActionName); + PolicySelection policy = + new PolicySelection(new FakeLoadBalancerProvider(childPolicyName), null, childConfig); + childPolicySelections.put(childActionName, policy); + routeList.add(route); + } + return new XdsRoutingConfig(routeList, childPolicySelections); } @Test