diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 4e08ddc5973..aff61cf7ada 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -32,6 +32,7 @@ import io.grpc.NameResolver; import io.grpc.NameResolver.ResolutionResult; import io.grpc.Status; +import io.grpc.StatusOr; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; @@ -615,79 +616,84 @@ private class NameResolverListener extends NameResolver.Listener2 { @Override public void onResult(final ResolutionResult resolutionResult) { - class NameResolved implements Runnable { - @Override - public void run() { - if (shutdown) { - return; - } - backoffPolicy = null; // reset backoff sequence if succeeded - // Arbitrary priority notation for all DNS-resolved endpoints. - String priorityName = priorityName(name, 0); // value doesn't matter - List addresses = new ArrayList<>(); - for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) { - // No weight attribute is attached, all endpoint-level LB policy should be able - // to handle such it. - String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY); - Attributes attr = eag.getAttributes().toBuilder() - .set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY) - .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName) - .set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName) - .build(); - eag = new EquivalentAddressGroup(eag.getAddresses(), attr); - eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); - addresses.add(eag); - } - PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig( - name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, - lbRegistry, Collections.emptyList()); - status = Status.OK; - resolved = true; - result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig); - handleEndpointResourceUpdate(); + syncContext.execute(() -> onResult2(resolutionResult)); + } + + @Override + public Status onResult2(final ResolutionResult resolutionResult) { + if (shutdown) { + return Status.OK; + } + // Arbitrary priority notation for all DNS-resolved endpoints. + String priorityName = priorityName(name, 0); // value doesn't matter + List addresses = new ArrayList<>(); + StatusOr> addressesOrError = + resolutionResult.getAddressesOrError(); + if (addressesOrError.hasValue()) { + backoffPolicy = null; // reset backoff sequence if succeeded + for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) { + // No weight attribute is attached, all endpoint-level LB policy should be able + // to handle such it. + String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY); + Attributes attr = eag.getAttributes().toBuilder() + .set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY) + .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName) + .set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName) + .build(); + eag = new EquivalentAddressGroup(eag.getAddresses(), attr); + eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); + addresses.add(eag); } + PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig( + name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, + lbRegistry, Collections.emptyList()); + status = Status.OK; + resolved = true; + result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig); + handleEndpointResourceUpdate(); + return Status.OK; + } else { + handleErrorInSyncContext(addressesOrError.getStatus()); + return addressesOrError.getStatus(); } - - syncContext.execute(new NameResolved()); } @Override public void onError(final Status error) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (shutdown) { - return; - } - status = error; - // NameResolver.Listener API cannot distinguish between address-not-found and - // transient errors. If the error occurs in the first resolution, treat it as - // address not found. Otherwise, either there is previously resolved addresses - // previously encountered error, propagate the error to downstream/upstream and - // let downstream/upstream handle it. - if (!resolved) { - resolved = true; - handleEndpointResourceUpdate(); - } else { - handleEndpointResolutionError(); - } - if (scheduledRefresh != null && scheduledRefresh.isPending()) { - return; - } - if (backoffPolicy == null) { - backoffPolicy = backoffPolicyProvider.get(); - } - long delayNanos = backoffPolicy.nextBackoffNanos(); - logger.log(XdsLogLevel.DEBUG, + syncContext.execute(() -> handleErrorInSyncContext(error)); + } + + private void handleErrorInSyncContext(final Status error) { + if (shutdown) { + return; + } + status = error; + // NameResolver.Listener API cannot distinguish between address-not-found and + // transient errors. If the error occurs in the first resolution, treat it as + // address not found. Otherwise, either there is previously resolved addresses + // previously encountered error, propagate the error to downstream/upstream and + // let downstream/upstream handle it. + if (!resolved) { + resolved = true; + handleEndpointResourceUpdate(); + } else { + handleEndpointResolutionError(); + } + if (scheduledRefresh != null && scheduledRefresh.isPending()) { + return; + } + if (backoffPolicy == null) { + backoffPolicy = backoffPolicyProvider.get(); + } + long delayNanos = backoffPolicy.nextBackoffNanos(); + logger.log(XdsLogLevel.DEBUG, "Logical DNS resolver for cluster {0} encountered name resolution " - + "error: {1}, scheduling DNS resolution backoff for {2} ns", + + "error: {1}, scheduling DNS resolution backoff for {2} ns", name, error, delayNanos); - scheduledRefresh = + scheduledRefresh = syncContext.schedule( - new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS, - timeService); - } - }); + new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS, + timeService); } } } diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index 9243abba6d3..2a8617912ea 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -198,6 +198,7 @@ public XdsClient returnObject(Object object) { private ArgumentCaptor pickerCaptor; private int xdsClientRefs; private ClusterResolverLoadBalancer loadBalancer; + private NameResolverProvider fakeNameResolverProvider; @Before public void setUp() throws URISyntaxException { @@ -214,7 +215,8 @@ public void setUp() throws URISyntaxException { .setServiceConfigParser(mock(ServiceConfigParser.class)) .setChannelLogger(mock(ChannelLogger.class)) .build(); - nsRegistry.register(new FakeNameResolverProvider()); + fakeNameResolverProvider = new FakeNameResolverProvider(false); + nsRegistry.register(fakeNameResolverProvider); when(helper.getNameResolverRegistry()).thenReturn(nsRegistry); when(helper.getNameResolverArgs()).thenReturn(args); when(helper.getSynchronizationContext()).thenReturn(syncContext); @@ -715,6 +717,17 @@ public void handleEdsResource_noHealthyEndpoint() { @Test public void onlyLogicalDnsCluster_endpointsResolved() { + do_onlyLogicalDnsCluster_endpointsResolved(); + } + + @Test + public void oldListenerCallback_onlyLogicalDnsCluster_endpointsResolved() { + nsRegistry.deregister(fakeNameResolverProvider); + nsRegistry.register(new FakeNameResolverProvider(true)); + do_onlyLogicalDnsCluster_endpointsResolved(); + } + + void do_onlyLogicalDnsCluster_endpointsResolved() { ClusterResolverConfig config = new ClusterResolverConfig( Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin); deliverLbConfig(config); @@ -743,7 +756,6 @@ public void onlyLogicalDnsCluster_endpointsResolved() { .get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME); assertThat(childBalancer.addresses.get(1).getAttributes() .get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME); - } @Test @@ -763,37 +775,48 @@ public void onlyLogicalDnsCluster_handleRefreshNameResolution() { } @Test - public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() { + public void resolutionError_backoffAndRefresh() { + do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh(); + } + + @Test + public void oldListenerCallback_resolutionError_backoffAndRefresh() { + nsRegistry.deregister(fakeNameResolverProvider); + nsRegistry.register(new FakeNameResolverProvider(true)); + do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh(); + } + + void do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() { InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider, - backoffPolicy1, backoffPolicy2); + backoffPolicy1, backoffPolicy2); ClusterResolverConfig config = new ClusterResolverConfig( - Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin); + Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin); deliverLbConfig(config); FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); assertThat(childBalancers).isEmpty(); Status error = Status.UNAVAILABLE.withDescription("cannot reach DNS server"); resolver.deliverError(error); inOrder.verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); assertPicker(pickerCaptor.getValue(), error, null); assertThat(resolver.refreshCount).isEqualTo(0); inOrder.verify(backoffPolicyProvider).get(); inOrder.verify(backoffPolicy1).nextBackoffNanos(); assertThat(fakeClock.getPendingTasks()).hasSize(1); assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS)) - .isEqualTo(1L); + .isEqualTo(1L); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertThat(resolver.refreshCount).isEqualTo(1); error = Status.UNKNOWN.withDescription("I am lost"); resolver.deliverError(error); inOrder.verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); inOrder.verify(backoffPolicy1).nextBackoffNanos(); assertPicker(pickerCaptor.getValue(), error, null); assertThat(fakeClock.getPendingTasks()).hasSize(1); assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS)) - .isEqualTo(10L); + .isEqualTo(10L); fakeClock.forwardTime(10L, TimeUnit.SECONDS); assertThat(resolver.refreshCount).isEqualTo(2); @@ -803,7 +826,7 @@ public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() { resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2)); assertThat(childBalancers).hasSize(1); assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), - Iterables.getOnlyElement(childBalancers).addresses); + Iterables.getOnlyElement(childBalancers).addresses); assertThat(fakeClock.getPendingTasks()).isEmpty(); inOrder.verifyNoMoreInteractions(); @@ -1204,10 +1227,18 @@ void deliverError(Status error) { } private class FakeNameResolverProvider extends NameResolverProvider { + private final boolean useOldListenerCallback; + + private FakeNameResolverProvider(boolean useOldListenerCallback) { + this.useOldListenerCallback = useOldListenerCallback; + } + @Override public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { assertThat(targetUri.getScheme()).isEqualTo("dns"); - FakeNameResolver resolver = new FakeNameResolver(targetUri); + FakeNameResolver resolver = useOldListenerCallback + ? new FakeNameResolverUsingOldListenerCallback(targetUri) + : new FakeNameResolver(targetUri); resolvers.add(resolver); return resolver; } @@ -1228,9 +1259,10 @@ protected int priority() { } } + private class FakeNameResolver extends NameResolver { private final URI targetUri; - private Listener2 listener; + protected Listener2 listener; private int refreshCount; private FakeNameResolver(URI targetUri) { @@ -1257,12 +1289,33 @@ public void shutdown() { resolvers.remove(this); } - private void deliverEndpointAddresses(List addresses) { + protected void deliverEndpointAddresses(List addresses) { + syncContext.execute(() -> { + Status ret = listener.onResult2(ResolutionResult.newBuilder() + .setAddressesOrError(StatusOr.fromValue(addresses)).build()); + assertThat(ret.getCode()).isEqualTo(Status.Code.OK); + }); + } + + protected void deliverError(Status error) { + syncContext.execute(() -> listener.onResult2(ResolutionResult.newBuilder() + .setAddressesOrError(StatusOr.fromStatus(error)).build())); + } + } + + private class FakeNameResolverUsingOldListenerCallback extends FakeNameResolver { + private FakeNameResolverUsingOldListenerCallback(URI targetUri) { + super(targetUri); + } + + @Override + protected void deliverEndpointAddresses(List addresses) { listener.onResult(ResolutionResult.newBuilder() - .setAddressesOrError(StatusOr.fromValue(addresses)).build()); + .setAddressesOrError(StatusOr.fromValue(addresses)).build()); } - private void deliverError(Status error) { + @Override + protected void deliverError(Status error) { listener.onError(error); } }