From 8d41a433149f66b2a9b3553307ef97483771f229 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Thu, 8 Dec 2022 17:19:10 -0800 Subject: [PATCH 1/5] xds:Change timer creation logic to wait until the adsStream is ready before creating the timer to mark resources absent. --- .../java/io/grpc/xds/AbstractXdsClient.java | 54 +++++++++++++- xds/src/main/java/io/grpc/xds/XdsClient.java | 8 +++ .../main/java/io/grpc/xds/XdsClientImpl.java | 36 +++++++++- .../io/grpc/xds/XdsClientImplTestBase.java | 71 ++++++++++++++++--- 4 files changed, 153 insertions(+), 16 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java index d7eb9383f07..76d60a40c49 100644 --- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java @@ -36,6 +36,8 @@ import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.EnvoyProtoData.Node; @@ -71,6 +73,7 @@ final class AbstractXdsClient { private final BackoffPolicy.Provider backoffPolicyProvider; private final Stopwatch stopwatch; private final Node bootstrapNode; + private final XdsClient.TimerLaunch timerLaunch; // Last successfully applied version_info for each resource type. Starts with empty string. // A version_info is used to update management server with client's most recent knowledge of @@ -98,7 +101,8 @@ final class AbstractXdsClient { timeService, SynchronizationContext syncContext, BackoffPolicy.Provider backoffPolicyProvider, - Supplier stopwatchSupplier) { + Supplier stopwatchSupplier, + XdsClient.TimerLaunch timerLaunch) { this.serverInfo = checkNotNull(serverInfo, "serverInfo"); this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo); this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler"); @@ -108,6 +112,7 @@ final class AbstractXdsClient { this.timeService = checkNotNull(timeService, "timeService"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); + this.timerLaunch = checkNotNull(timerLaunch, "timerLaunch"); stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get(); logId = InternalLogId.allocate("xds-client", serverInfo.target()); logger = XdsLogger.withLogId(logId); @@ -199,6 +204,22 @@ boolean isInBackoff() { return rpcRetryTimer != null && rpcRetryTimer.isPending(); } + boolean isReady() { + return adsStream != null && adsStream.isReady(); + } + + /** + * Starts a timer for each requested resource that hasn't been responded to and + * has been waiting for the channel to get ready. + */ + void readyHandler() { + if (!isReady()) { + return; + } + + timerLaunch.startSubscriberTimersIfNeeded(serverInfo); + } + /** * Establishes the RPC connection by creating a new RPC stream on the given channel for * xDS protocol communication. @@ -262,6 +283,8 @@ private abstract class AbstractAdsStream { abstract void sendError(Exception error); + abstract boolean isReady(); + /** * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and * {@code errorDetail}. Used for reacting to a specific discovery response. For @@ -344,13 +367,26 @@ private void cleanUp() { private final class AdsStreamV2 extends AbstractAdsStream { private StreamObserver requestWriter; + @Override + public boolean isReady() { + return requestWriter != null && ((ClientCallStreamObserver) requestWriter).isReady(); + } + @Override void start() { io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc .AggregatedDiscoveryServiceStub stub = io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.newStub(channel); StreamObserver responseReaderV2 = - new StreamObserver() { + new ClientResponseObserver() { + + @Override + public void beforeStart( + ClientCallStreamObserver reqStream) { + reqStream.setOnReadyHandler(AbstractXdsClient.this::readyHandler); + } + @Override public void onNext(final io.envoyproxy.envoy.api.v2.DiscoveryResponse response) { syncContext.execute(new Runnable() { @@ -427,11 +463,23 @@ void sendError(Exception error) { private final class AdsStreamV3 extends AbstractAdsStream { private StreamObserver requestWriter; + @Override + public boolean isReady() { + return requestWriter != null && ((ClientCallStreamObserver) requestWriter).isReady(); + } + @Override void start() { AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub = AggregatedDiscoveryServiceGrpc.newStub(channel); - StreamObserver responseReader = new StreamObserver() { + StreamObserver responseReader = + new ClientResponseObserver() { + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + requestStream.setOnReadyHandler(AbstractXdsClient.this::readyHandler); + } + @Override public void onNext(final DiscoveryResponse response) { syncContext.execute(new Runnable() { diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 82af8651159..591c4d7f339 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -373,4 +373,12 @@ Collection getSubscribedResources(ServerInfo serverInfo, Map> getSubscribedResourceTypesWithTypeUrl(); } + + interface TimerLaunch { + /** + * For all subscriber's for the specified server, if the resource hasn't yet been + * resolved then start a timer for it. + */ + void startSubscriberTimersIfNeeded(ServerInfo serverInfo); + } } diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index a30b4755a16..40fb91eba0c 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -47,6 +47,7 @@ import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats; import io.grpc.xds.XdsClient.ResourceStore; +import io.grpc.xds.XdsClient.TimerLaunch; import io.grpc.xds.XdsClient.XdsResponseHandler; import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.XdsListenerResource.LdsUpdate; @@ -67,7 +68,8 @@ /** * XdsClient implementation for client side usages. */ -final class XdsClientImpl extends XdsClient implements XdsResponseHandler, ResourceStore { +final class XdsClientImpl extends XdsClient + implements XdsResponseHandler, ResourceStore, TimerLaunch { // Longest time to wait, since the subscription to some resource, for concluding its absence. @VisibleForTesting @@ -146,7 +148,8 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) { timeService, syncContext, backoffPolicyProvider, - stopwatchSupplier); + stopwatchSupplier, + this); LoadReportClient lrsClient = new LoadReportClient( loadStatsManager, xdsChannel.channel(), context, serverInfo.useProtocolV3(), bootstrapInfo.node(), syncContext, timeService, backoffPolicyProvider, stopwatchSupplier); @@ -380,6 +383,30 @@ public String toString() { return logId.toString(); } + @Override + public void startSubscriberTimersIfNeeded(ServerInfo serverInfo) { + if (isShutDown()) { + return; + } + + syncContext.execute(new Runnable() { + @Override + public void run() { + if (isShutDown()) { + return; + } + + for (Map> subscriberMap : resourceSubscribers.values()) { + for (ResourceSubscriber subscriber : subscriberMap.values()) { + if (subscriber.serverInfo.equals(serverInfo) && subscriber.respTimer == null) { + subscriber.restartTimer(); + } + } + } + } + }); + } + private void cleanUpResourceTimers() { for (Map> subscriberMap : resourceSubscribers.values()) { for (ResourceSubscriber subscriber : subscriberMap.values()) { @@ -577,6 +604,10 @@ void restartTimer() { if (data != null || absent) { // resource already resolved return; } + if (!xdsChannel.isReady()) { // When channel becomes ready, it will trigger a restartTimer + return; + } + class ResourceNotFound implements Runnable { @Override public void run() { @@ -594,6 +625,7 @@ public String toString() { // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED. metadata = ResourceMetadata.newResourceMetadataRequested(); + respTimer = syncContext.schedule( new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService); diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index 31d10abd841..14cab15b32b 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -49,6 +49,7 @@ import io.grpc.Context.CancellableContext; import io.grpc.InsecureChannelCredentials; import io.grpc.ManagedChannel; +import io.grpc.Server; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.inprocess.InProcessChannelBuilder; @@ -90,6 +91,7 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -243,6 +245,7 @@ public long currentTimeNanos() { private ArgumentCaptor edsUpdateCaptor; @Captor private ArgumentCaptor errorCaptor; + @Mock private BackoffPolicy.Provider backoffPolicyProvider; @Mock @@ -268,6 +271,10 @@ public long currentTimeNanos() { private boolean originalEnableRbac; private boolean originalEnableLeastRequest; private boolean originalEnableFederation; + private Server xdsServer; + private final String serverName = InProcessServerBuilder.generateName(); + private BindableService adsService = createAdsService(); + private BindableService lrsService = createLrsService(); @Before public void setUp() throws IOException { @@ -285,15 +292,14 @@ public void setUp() throws IOException { originalEnableLeastRequest = XdsResourceType.enableLeastRequest; XdsResourceType.enableLeastRequest = true; originalEnableFederation = BootstrapperImpl.enableFederation; - final String serverName = InProcessServerBuilder.generateName(); - cleanupRule.register( - InProcessServerBuilder - .forName(serverName) - .addService(createAdsService()) - .addService(createLrsService()) - .directExecutor() - .build() - .start()); + xdsServer = InProcessServerBuilder + .forName(serverName) + .addService(adsService) + .addService(lrsService) + .directExecutor() + .build() + .start(); + cleanupRule.register(xdsServer); channel = cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() { @@ -3472,6 +3478,48 @@ public void serverSideListenerResponseErrorHandling_badTransportSocketName() { verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); } + @Test + public void sendingToStoppedServer() throws Exception { + try { + // Establish the adsStream object + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE, + cdsResourceWatcher); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + + // Shutdown server and initiate a request + xdsServer.shutdownNow(); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, + ldsResourceWatcher); + fakeClock.forwardTime(14, TimeUnit.SECONDS); + + // Restart the server + xdsServer = + InProcessServerBuilder + .forName(serverName) + .addService(adsService) + .addService(lrsService) + .directExecutor() + .build(); + xdsServer.start(); + Thread.sleep(2000); // Because channels use real time, need to use sleep + fakeClock.forwardTime(5, TimeUnit.SECONDS); + call = resourceDiscoveryCalls.poll(); + + // Send a response and do verifications + verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE); + call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0001"); + call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0001", NODE); + verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); + verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue()); + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT); + verifySubscribedResourcesMetadataSizes(1, 1, 0, 0); + } catch (Throwable t) { + throw t; // This allows putting a breakpoint here for debugging + } + } + + private DiscoveryRpcCall startResourceWatcher( XdsResourceType type, String name, ResourceWatcher watcher) { FakeClock.TaskFilter timeoutTaskFilter; @@ -3495,10 +3543,11 @@ private DiscoveryRpcCall startResourceWatcher( default: throw new AssertionError("should never be here"); } + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); call.verifyRequest(type, Collections.singletonList(name), "", "", NODE); - ScheduledTask timeoutTask = - Iterables.getOnlyElement(fakeClock.getPendingTasks(timeoutTaskFilter)); + Collection pendingTasks = fakeClock.getPendingTasks(timeoutTaskFilter); + ScheduledTask timeoutTask = Iterables.getOnlyElement(pendingTasks); assertThat(timeoutTask.getDelay(TimeUnit.SECONDS)) .isEqualTo(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC); return call; From 9b997c60cb17415fbe3b1a295f5d8cc545d5a769 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Fri, 9 Dec 2022 11:54:50 -0800 Subject: [PATCH 2/5] xds:When the ads stream is closed only send errors to subscribers that haven't yet gotten results to match spec. --- .../main/java/io/grpc/xds/XdsClientImpl.java | 8 +++++++- .../io/grpc/xds/XdsClientImplTestBase.java | 18 ++++++------------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 40fb91eba0c..91a5ce17394 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -185,7 +185,9 @@ public void handleStreamClosed(Status error) { for (Map> subscriberMap : resourceSubscribers.values()) { for (ResourceSubscriber subscriber : subscriberMap.values()) { - subscriber.onError(error); + if (!subscriber.hasResult()) { + subscriber.onError(error); + } } } } @@ -657,6 +659,10 @@ boolean isWatched() { return !watchers.isEmpty(); } + boolean hasResult() { + return data != null || absent; + } + void onData(ParsedResource parsedResource, String version, long updateTime) { if (respTimer != null && respTimer.isPending()) { respTimer.cancel(); diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index 14cab15b32b..23974f45dfa 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -3207,10 +3207,8 @@ public void streamClosedAndRetryWithBackoff() { call.verifyRequest(RDS, RDS_RESOURCE, "5", "6764", NODE); call.sendError(Status.DEADLINE_EXCEEDED.asException()); - verify(ldsResourceWatcher, times(3)).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); - verify(rdsResourceWatcher, times(3)).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); + verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture()); + verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture()); verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture()); @@ -3231,10 +3229,8 @@ public void streamClosedAndRetryWithBackoff() { // Management server becomes unreachable again. call.sendError(Status.UNAVAILABLE.asException()); - verify(ldsResourceWatcher, times(4)).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); - verify(rdsResourceWatcher, times(4)).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); + verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture()); + verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture()); verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture()); @@ -3318,10 +3314,8 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe call.sendError(Status.UNAVAILABLE.asException()); assertThat(cdsResourceTimeout.isCancelled()).isTrue(); assertThat(edsResourceTimeout.isCancelled()).isTrue(); - verify(ldsResourceWatcher).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); - verify(rdsResourceWatcher).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); + verify(ldsResourceWatcher, never()).onError(errorCaptor.capture()); + verify(rdsResourceWatcher, never()).onError(errorCaptor.capture()); verify(cdsResourceWatcher).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); verify(edsResourceWatcher).onError(errorCaptor.capture()); From ac841be06af9c50a5c069ce960670b09e909c169 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Fri, 9 Dec 2022 12:57:03 -0800 Subject: [PATCH 3/5] Use a blocking queue to avoid the 2-second sleep. For some inexplicable reason the following call.verifyRequest fails only for the V2 test and only from command line not IDE unless there is some Thread.sleep, even if it is only 1-millis. --- .../java/io/grpc/xds/XdsClientImplTestBase.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index 23974f45dfa..183902d85ab 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -96,6 +96,8 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; @@ -186,7 +188,8 @@ public boolean shouldAccept(Runnable command) { public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); private final FakeClock fakeClock = new FakeClock(); - protected final Queue resourceDiscoveryCalls = new ArrayDeque<>(); + protected final BlockingDeque resourceDiscoveryCalls = + new LinkedBlockingDeque<>(1); protected final Queue loadReportCalls = new ArrayDeque<>(); protected final AtomicBoolean adsEnded = new AtomicBoolean(true); protected final AtomicBoolean lrsEnded = new AtomicBoolean(true); @@ -1050,10 +1053,10 @@ public void rdsResourceUpdated_withXdstpResourceName_unknownAuthority() { assertThat(error.getCode()).isEqualTo(Code.INVALID_ARGUMENT); assertThat(error.getDescription()).isEqualTo( "Wrong configuration: xds server does not exist for resource " + rdsResourceName); - assertThat(resourceDiscoveryCalls.poll()).isNull(); + assertThat(resourceDiscoveryCalls.size()).isEqualTo(0); xdsClient.cancelXdsResourceWatch( XdsRouteConfigureResource.getInstance(),rdsResourceName, rdsResourceWatcher); - assertThat(resourceDiscoveryCalls.poll()).isNull(); + assertThat(resourceDiscoveryCalls.size()).isEqualTo(0); } @Test @@ -3478,7 +3481,7 @@ public void sendingToStoppedServer() throws Exception { // Establish the adsStream object xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE, cdsResourceWatcher); - DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + resourceDiscoveryCalls.take(); // clear this entry // Shutdown server and initiate a request xdsServer.shutdownNow(); @@ -3495,9 +3498,9 @@ public void sendingToStoppedServer() throws Exception { .directExecutor() .build(); xdsServer.start(); - Thread.sleep(2000); // Because channels use real time, need to use sleep fakeClock.forwardTime(5, TimeUnit.SECONDS); - call = resourceDiscoveryCalls.poll(); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS); + Thread.sleep(1); // For some reason the V2 test fails the verifyRequest without this // Send a response and do verifications verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE); From 4b8e64ed241ef28244fcfb9aa8af6e1992100468 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Fri, 9 Dec 2022 14:37:21 -0800 Subject: [PATCH 4/5] Update comment per reviewers request --- xds/src/main/java/io/grpc/xds/XdsClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 91a5ce17394..d9ddc0fd073 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -66,7 +66,7 @@ import javax.annotation.Nullable; /** - * XdsClient implementation for client side usages. + * XdsClient implementation. */ final class XdsClientImpl extends XdsClient implements XdsResponseHandler, ResourceStore, TimerLaunch { From ca6690c051e687b5303e0dd5df912ff520de2a33 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Wed, 14 Dec 2022 13:28:24 -0800 Subject: [PATCH 5/5] Cosmetic changes in response to code review. --- .../java/io/grpc/xds/XdsClientImplTestBase.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index 183902d85ab..c93c47ec4c1 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -91,7 +91,6 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -295,14 +294,13 @@ public void setUp() throws IOException { originalEnableLeastRequest = XdsResourceType.enableLeastRequest; XdsResourceType.enableLeastRequest = true; originalEnableFederation = BootstrapperImpl.enableFederation; - xdsServer = InProcessServerBuilder + xdsServer = cleanupRule.register(InProcessServerBuilder .forName(serverName) .addService(adsService) .addService(lrsService) .directExecutor() .build() - .start(); - cleanupRule.register(xdsServer); + .start()); channel = cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() { @@ -3490,14 +3488,14 @@ public void sendingToStoppedServer() throws Exception { fakeClock.forwardTime(14, TimeUnit.SECONDS); // Restart the server - xdsServer = + xdsServer = cleanupRule.register( InProcessServerBuilder .forName(serverName) .addService(adsService) .addService(lrsService) .directExecutor() - .build(); - xdsServer.start(); + .build() + .start()); fakeClock.forwardTime(5, TimeUnit.SECONDS); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS); Thread.sleep(1); // For some reason the V2 test fails the verifyRequest without this @@ -3543,8 +3541,8 @@ private DiscoveryRpcCall startResourceWatcher( DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); call.verifyRequest(type, Collections.singletonList(name), "", "", NODE); - Collection pendingTasks = fakeClock.getPendingTasks(timeoutTaskFilter); - ScheduledTask timeoutTask = Iterables.getOnlyElement(pendingTasks); + ScheduledTask timeoutTask = + Iterables.getOnlyElement(fakeClock.getPendingTasks(timeoutTaskFilter)); assertThat(timeoutTask.getDelay(TimeUnit.SECONDS)) .isEqualTo(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC); return call;