diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/DynamicEndpointGroup.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/DynamicEndpointGroup.java index 23a28a258bc9..3b980f1771f9 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/DynamicEndpointGroup.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/DynamicEndpointGroup.java @@ -17,6 +17,8 @@ import static com.google.common.collect.ImmutableList.toImmutableList; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.Lock; @@ -32,7 +34,12 @@ * A dynamic {@link EndpointGroup}. The list of {@link Endpoint}s can be updated dynamically. */ public class DynamicEndpointGroup extends AbstractListenable> implements EndpointGroup { - private volatile List endpoints = ImmutableList.of(); + + // An empty list of endpoints we also use as a marker that we have not initialized endpoints yet. + private static final List UNINITIALIZED_ENDPOINTS = Collections.unmodifiableList( + new ArrayList<>()); + + private volatile List endpoints = UNINITIALIZED_ENDPOINTS; private final Lock endpointsLock = new ReentrantLock(); private final CompletableFuture> initialEndpointsFuture = new CompletableFuture<>(); @@ -90,7 +97,7 @@ protected final void setEndpoints(Iterable endpoints) { final List oldEndpoints = this.endpoints; final List newEndpoints = ImmutableList.sortedCopyOf(endpoints); - if (oldEndpoints.equals(newEndpoints)) { + if (oldEndpoints != UNINITIALIZED_ENDPOINTS && oldEndpoints.equals(newEndpoints)) { return; } @@ -106,7 +113,7 @@ protected final void setEndpoints(Iterable endpoints) { } private void completeInitialEndpointsFuture(List endpoints) { - if (!endpoints.isEmpty() && !initialEndpointsFuture.isDone()) { + if (endpoints != UNINITIALIZED_ENDPOINTS && !initialEndpointsFuture.isDone()) { initialEndpointsFuture.complete(endpoints); } } diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java index 5fdcaa6f97ff..c6c3916278ce 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java @@ -128,6 +128,11 @@ public static HealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate, snapshot = ImmutableList.copyOf(contexts.values()); } snapshot.forEach(ctx -> ctx.initialCheckFuture.join()); + + // If all endpoints are unhealthy, we will not have called setEndpoints even once, meaning listeners + // aren't notified that we've finished an initial health check. We make sure to refresh endpoints once + // on initialization to ensure this happens, even if the endpoints are currently empty. + refreshEndpoints(); } private void updateCandidates(List candidates) { @@ -163,6 +168,13 @@ private void updateCandidates(List candidates) { } } + private void refreshEndpoints() { + // Rebuild the endpoint list and notify. + setEndpoints(delegate.endpoints().stream() + .filter(healthyEndpoints::contains) + .collect(toImmutableList())); + } + @Override public void close() { if (closed) { @@ -333,10 +345,7 @@ private void updateHealth(double health, boolean updateEvenIfDestroyed) { } if (updated) { - // Rebuild the endpoint list and notify. - setEndpoints(delegate.endpoints().stream() - .filter(healthyEndpoints::contains) - .collect(toImmutableList())); + refreshEndpoints(); } initialCheckFuture.complete(null); diff --git a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java index 69cef477e59d..97926b3788a0 100644 --- a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java @@ -78,6 +78,26 @@ void delegateUpdateCandidatesWhileCreatingHealthCheckedEndpointGroup() { }.build(); } + @Test + void startsUnhealthyAwaitsForEmptyEndpoints() throws Exception { + final MockEndpointGroup delegate = new MockEndpointGroup(); + delegate.set(Endpoint.of("foo")); + final AtomicReference ctxCapture = new AtomicReference<>(); + + try (HealthCheckedEndpointGroup group = new AbstractHealthCheckedEndpointGroupBuilder(delegate) { + @Override + protected Function newCheckerFactory() { + return ctx -> { + ctxCapture.set(ctx); + ctx.updateHealth(0); + return () -> CompletableFuture.completedFuture(null); + }; + } + }.build()) { + assertThat(group.awaitInitialEndpoints(10, TimeUnit.SECONDS)).isEmpty(); + } + } + @Test void disappearedEndpoint() { // Start with an endpoint group that has healthy 'foo'.