From 75ba9fbb8ba2e3eb0fda97dfcf7407cc97be478b Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Mon, 3 Jun 2019 11:05:09 -0700 Subject: [PATCH 1/4] xds: allow grpclb balancer addresses for backward compatibility --- .../java/io/grpc/xds/XdsLoadBalancer.java | 14 +++- .../io/grpc/xds/XdsLoadBalancerProvider.java | 4 +- .../java/io/grpc/xds/FallbackManagerTest.java | 76 ++++++++++++++++++- 3 files changed, 90 insertions(+), 4 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java index 634fadb70a2..3b3903808fd 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static io.grpc.xds.XdsLoadBalancerProvider.XDS_POLICY_NAME; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -35,6 +36,7 @@ import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; import io.grpc.SynchronizationContext.ScheduledHandle; +import io.grpc.internal.GrpcAttributes; import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.xds.LocalityStore.LocalityStoreImpl; @@ -326,6 +328,16 @@ void updateFallbackServers( List servers, Attributes attributes, LbConfig fallbackPolicy) { this.fallbackServers = servers; + String fallbackPolicyName = fallbackPolicy.getPolicyName(); + if (!fallbackPolicyName.equals("grpclb") && !fallbackPolicyName.equals(XDS_POLICY_NAME)) { + ImmutableList.Builder backends = ImmutableList.builder(); + for (EquivalentAddressGroup eag : servers) { + if (eag.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) == null) { + backends.add(eag); + } + } + this.fallbackServers = backends.build(); + } this.fallbackAttributes = Attributes.newBuilder() .setAll(attributes) .set(ATTR_LOAD_BALANCING_CONFIG, fallbackPolicy.getRawConfigValue()) @@ -333,7 +345,7 @@ void updateFallbackServers( LbConfig currentFallbackPolicy = this.fallbackPolicy; this.fallbackPolicy = fallbackPolicy; if (fallbackBalancer != null) { - if (fallbackPolicy.getPolicyName().equals(currentFallbackPolicy.getPolicyName())) { + if (fallbackPolicyName.equals(currentFallbackPolicy.getPolicyName())) { // TODO(carl-mastrangelo): propagate the load balancing config policy fallbackBalancer.handleResolvedAddresses( ResolvedAddresses.newBuilder() diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java index a2b727decc1..d334ae28755 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java @@ -40,6 +40,8 @@ @Internal public final class XdsLoadBalancerProvider extends LoadBalancerProvider { + static final String XDS_POLICY_NAME = "xds_experimental"; + private static final LbConfig DEFAULT_FALLBACK_POLICY = new LbConfig("round_robin", ImmutableMap.of()); @@ -55,7 +57,7 @@ public int getPriority() { @Override public String getPolicyName() { - return "xds_experimental"; + return XDS_POLICY_NAME; } @Override diff --git a/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java b/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java index 92b698a5a90..2186082a625 100644 --- a/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import com.google.common.collect.ImmutableList; import io.grpc.Attributes; import io.grpc.ChannelLogger; import io.grpc.EquivalentAddressGroup; @@ -31,8 +32,11 @@ import io.grpc.LoadBalancerRegistry; import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; +import io.grpc.internal.GrpcAttributes; import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.xds.XdsLoadBalancer.FallbackManager; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -70,7 +74,7 @@ public int getPriority() { @Override public String getPolicyName() { - return "test_policy"; + return fallbackPolicy.getPolicyName(); } @Override @@ -127,10 +131,10 @@ public void setUp() { doReturn(syncContext).when(helper).getSynchronizationContext(); doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService(); doReturn(channelLogger).when(helper).getChannelLogger(); + fallbackPolicy = new LbConfig("test_policy", new HashMap()); lbRegistry.register(fakeRoundRonbinLbProvider); lbRegistry.register(fakeFallbackLbProvider); fallbackManager = new FallbackManager(helper, lbRegistry); - fallbackPolicy = new LbConfig("test_policy", new HashMap()); } @After @@ -164,6 +168,74 @@ public void useFallbackWhenTimeout() { .build()); } + @Test + public void fallback_handleBackendsEagsOnly() { + fallbackManager.startFallbackTimer(); + EquivalentAddressGroup eag0 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8080))); + Attributes attributes = Attributes + .newBuilder() + .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "this is a balancer address") + .build(); + EquivalentAddressGroup eag1 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8081)), attributes); + EquivalentAddressGroup eag2 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8082))); + List eags = ImmutableList.of(eag0, eag1, eag2); + fallbackManager.updateFallbackServers( + eags, Attributes.EMPTY, fallbackPolicy); + + fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + assertThat(fallbackManager.isInFallbackMode()).isTrue(); + verify(fakeFallbackLb).handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of(eag0, eag2)) + .setAttributes( + Attributes.newBuilder() + .set( + LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, + fallbackPolicy.getRawConfigValue()) + .build()) + .build()); + } + + @Test + public void fallback_handleGrpclbAddresses() { + lbRegistry.deregister(fakeFallbackLbProvider); + fallbackPolicy = new LbConfig("grpclb", new HashMap()); + lbRegistry.register(fakeFallbackLbProvider); + + fallbackManager.startFallbackTimer(); + EquivalentAddressGroup eag0 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8080))); + Attributes attributes = Attributes + .newBuilder() + .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "this is a balancer address") + .build(); + EquivalentAddressGroup eag1 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8081)), attributes); + EquivalentAddressGroup eag2 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8082))); + List eags = ImmutableList.of(eag0, eag1, eag2); + fallbackManager.updateFallbackServers( + eags, Attributes.EMPTY, fallbackPolicy); + + fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + assertThat(fallbackManager.isInFallbackMode()).isTrue(); + verify(fakeFallbackLb).handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(eags) + .setAttributes( + Attributes.newBuilder() + .set( + LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, + fallbackPolicy.getRawConfigValue()) + .build()) + .build()); + } + @Test public void cancelFallback() { fallbackManager.startFallbackTimer(); From 21a94fbae560c69c4ba80652e5fe1d2f59e67751 Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Fri, 7 Jun 2019 09:44:14 -0700 Subject: [PATCH 2/4] add comment --- xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java index 3b3903808fd..8bcf68ad579 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java @@ -329,6 +329,9 @@ void updateFallbackServers( LbConfig fallbackPolicy) { this.fallbackServers = servers; String fallbackPolicyName = fallbackPolicy.getPolicyName(); + + // Some addresses in the list may be grpclb-v1 balancer addresses, so if the fallback policy + // does not support grpclb-v1 balancer addresses, then we need to exclude them from the list. if (!fallbackPolicyName.equals("grpclb") && !fallbackPolicyName.equals(XDS_POLICY_NAME)) { ImmutableList.Builder backends = ImmutableList.builder(); for (EquivalentAddressGroup eag : servers) { @@ -338,6 +341,7 @@ void updateFallbackServers( } this.fallbackServers = backends.build(); } + this.fallbackAttributes = Attributes.newBuilder() .setAll(attributes) .set(ATTR_LOAD_BALANCING_CONFIG, fallbackPolicy.getRawConfigValue()) From 9d744e91311e2cf83bdda6dbc72ff08e9b6b545b Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Fri, 7 Jun 2019 10:35:24 -0700 Subject: [PATCH 3/4] handle edge case for empty address list --- .../java/io/grpc/xds/XdsLoadBalancer.java | 62 +++++++++++-------- .../java/io/grpc/xds/FallbackManagerTest.java | 37 ++++++++++- .../java/io/grpc/xds/XdsLoadBalancerTest.java | 3 + 3 files changed, 72 insertions(+), 30 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java index 8bcf68ad579..44dd397f6e0 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java @@ -316,32 +316,13 @@ protected Helper delegate() { fallbackBalancer = lbRegistry.getProvider(fallbackPolicy.getPolicyName()) .newLoadBalancer(fallbackBalancerHelper); fallbackBalancerHelper.balancer = fallbackBalancer; - // TODO(carl-mastrangelo): propagate the load balancing config policy - fallbackBalancer.handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(fallbackServers) - .setAttributes(fallbackAttributes) - .build()); + handleFallbackAddressesOrNameResolutionError(); } void updateFallbackServers( List servers, Attributes attributes, LbConfig fallbackPolicy) { this.fallbackServers = servers; - String fallbackPolicyName = fallbackPolicy.getPolicyName(); - - // Some addresses in the list may be grpclb-v1 balancer addresses, so if the fallback policy - // does not support grpclb-v1 balancer addresses, then we need to exclude them from the list. - if (!fallbackPolicyName.equals("grpclb") && !fallbackPolicyName.equals(XDS_POLICY_NAME)) { - ImmutableList.Builder backends = ImmutableList.builder(); - for (EquivalentAddressGroup eag : servers) { - if (eag.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) == null) { - backends.add(eag); - } - } - this.fallbackServers = backends.build(); - } - this.fallbackAttributes = Attributes.newBuilder() .setAll(attributes) .set(ATTR_LOAD_BALANCING_CONFIG, fallbackPolicy.getRawConfigValue()) @@ -349,13 +330,8 @@ void updateFallbackServers( LbConfig currentFallbackPolicy = this.fallbackPolicy; this.fallbackPolicy = fallbackPolicy; if (fallbackBalancer != null) { - if (fallbackPolicyName.equals(currentFallbackPolicy.getPolicyName())) { - // TODO(carl-mastrangelo): propagate the load balancing config policy - fallbackBalancer.handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(fallbackServers) - .setAttributes(fallbackAttributes) - .build()); + if (fallbackPolicy.getPolicyName().equals(currentFallbackPolicy.getPolicyName())) { + handleFallbackAddressesOrNameResolutionError(); } else { fallbackBalancer.shutdown(); fallbackBalancer = null; @@ -364,6 +340,38 @@ void updateFallbackServers( } } + private void handleFallbackAddressesOrNameResolutionError() { + String fallbackPolicyName = fallbackPolicy.getPolicyName(); + List servers = fallbackServers; + + // Some addresses in the list may be grpclb-v1 balancer addresses, so if the fallback policy + // does not support grpclb-v1 balancer addresses, then we need to exclude them from the list. + if (!fallbackPolicyName.equals("grpclb") && !fallbackPolicyName.equals(XDS_POLICY_NAME)) { + ImmutableList.Builder backends = ImmutableList.builder(); + for (EquivalentAddressGroup eag : fallbackServers) { + if (eag.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) == null) { + backends.add(eag); + } + } + servers = backends.build(); + } + + // FIXME: this is a temporary hack. + if (servers.isEmpty() + && !fallbackBalancer.canHandleEmptyAddressListFromNameResolution()) { + fallbackBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription( + "NameResolver returned no usable address." + + " addrs=" + fallbackServers + ", attrs=" + fallbackAttributes)); + } else { + // TODO(carl-mastrangelo): propagate the load balancing config policy + fallbackBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers) + .setAttributes(fallbackAttributes) + .build()); + } + } + void startFallbackTimer() { if (fallbackTimer == null) { class FallbackTask implements Runnable { diff --git a/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java b/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java index 2186082a625..a79a5dd9082 100644 --- a/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java @@ -30,6 +30,8 @@ import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; +import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; import io.grpc.internal.GrpcAttributes; @@ -37,7 +39,6 @@ import io.grpc.xds.XdsLoadBalancer.FallbackManager; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeUnit; @@ -46,6 +47,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -145,7 +147,8 @@ public void tearDown() { @Test public void useFallbackWhenTimeout() { fallbackManager.startFallbackTimer(); - List eags = new ArrayList<>(); + List eags = ImmutableList.of( + new EquivalentAddressGroup(ImmutableList.of(new InetSocketAddress(8080)))); fallbackManager.updateFallbackServers( eags, Attributes.EMPTY, fallbackPolicy); @@ -236,10 +239,38 @@ public void fallback_handleGrpclbAddresses() { .build()); } + @Test + public void fallback_onlyGrpclbAddresses_NoBackendAddress() { + lbRegistry.deregister(fakeFallbackLbProvider); + fallbackPolicy = new LbConfig("not_grpclb", new HashMap()); + lbRegistry.register(fakeFallbackLbProvider); + + fallbackManager.startFallbackTimer(); + Attributes attributes = Attributes + .newBuilder() + .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "this is a balancer address") + .build(); + EquivalentAddressGroup eag1 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8081)), attributes); + EquivalentAddressGroup eag2 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8082)), attributes); + List eags = ImmutableList.of(eag1, eag2); + fallbackManager.updateFallbackServers( + eags, Attributes.EMPTY, fallbackPolicy); + + fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + assertThat(fallbackManager.isInFallbackMode()).isTrue(); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(fakeFallbackLb).handleNameResolutionError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + } + @Test public void cancelFallback() { fallbackManager.startFallbackTimer(); - List eags = new ArrayList<>(); + List eags = ImmutableList.of( + new EquivalentAddressGroup(ImmutableList.of(new InetSocketAddress(8080)))); fallbackManager.updateFallbackServers( eags, Attributes.EMPTY, fallbackPolicy); diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java index c26af06f299..416f8ad4ca3 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java @@ -281,6 +281,9 @@ public void onCompleted() { doReturn(oobChannel1).doReturn(oobChannel2).doReturn(oobChannel3) .when(helper).createResolvingOobChannel(anyString()); + + // To write less tedious code for tests, allow fallbackBalancer to handle empty address list. + doReturn(true).when(fallbackBalancer1).canHandleEmptyAddressListFromNameResolution(); } @After From 2e43b600ab78f5eb1226f41cafb17c43f213306e Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Mon, 10 Jun 2019 10:38:54 -0700 Subject: [PATCH 4/4] fix comments --- xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java index 44dd397f6e0..bf82bac161a 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java @@ -316,7 +316,7 @@ protected Helper delegate() { fallbackBalancer = lbRegistry.getProvider(fallbackPolicy.getPolicyName()) .newLoadBalancer(fallbackBalancerHelper); fallbackBalancerHelper.balancer = fallbackBalancer; - handleFallbackAddressesOrNameResolutionError(); + propagateFallbackAddresses(); } void updateFallbackServers( @@ -331,7 +331,7 @@ void updateFallbackServers( this.fallbackPolicy = fallbackPolicy; if (fallbackBalancer != null) { if (fallbackPolicy.getPolicyName().equals(currentFallbackPolicy.getPolicyName())) { - handleFallbackAddressesOrNameResolutionError(); + propagateFallbackAddresses(); } else { fallbackBalancer.shutdown(); fallbackBalancer = null; @@ -340,7 +340,7 @@ void updateFallbackServers( } } - private void handleFallbackAddressesOrNameResolutionError() { + private void propagateFallbackAddresses() { String fallbackPolicyName = fallbackPolicy.getPolicyName(); List servers = fallbackServers; @@ -356,7 +356,7 @@ private void handleFallbackAddressesOrNameResolutionError() { servers = backends.build(); } - // FIXME: this is a temporary hack. + // TODO(zhangkun83): FIXME(#5496): this is a temporary hack. if (servers.isEmpty() && !fallbackBalancer.canHandleEmptyAddressListFromNameResolution()) { fallbackBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription(