Skip to content

Commit

Permalink
Make sure listeners are notified after first health check has resolve…
Browse files Browse the repository at this point in the history
…d. (line#2074)

Currently if all endpoints start out unhealthy, then the initial endpoints future for a health checked endpoint group never completes. The initial endpoints future should indicate when the endpoints have first been checked, not when they first become healthy - while an `EndpointGroup` with no `Endpoint`s is sort of useless, I think it's still a perfectly good `EndpointGroup` and deserves notifying listeners :)
  • Loading branch information
anuraaga authored and unknown committed Oct 15, 2019
1 parent ff61650 commit b97c21d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
Expand Up @@ -17,6 +17,8 @@

import static com.google.common.collect.ImmutableList.toImmutableList;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
Expand All @@ -32,7 +34,12 @@
* A dynamic {@link EndpointGroup}. The list of {@link Endpoint}s can be updated dynamically.
*/
public class DynamicEndpointGroup extends AbstractListenable<List<Endpoint>> implements EndpointGroup {
private volatile List<Endpoint> endpoints = ImmutableList.of();

// An empty list of endpoints we also use as a marker that we have not initialized endpoints yet.
private static final List<Endpoint> UNINITIALIZED_ENDPOINTS = Collections.unmodifiableList(
new ArrayList<>());

private volatile List<Endpoint> endpoints = UNINITIALIZED_ENDPOINTS;
private final Lock endpointsLock = new ReentrantLock();
private final CompletableFuture<List<Endpoint>> initialEndpointsFuture = new CompletableFuture<>();

Expand Down Expand Up @@ -90,7 +97,7 @@ protected final void setEndpoints(Iterable<Endpoint> endpoints) {
final List<Endpoint> oldEndpoints = this.endpoints;
final List<Endpoint> newEndpoints = ImmutableList.sortedCopyOf(endpoints);

if (oldEndpoints.equals(newEndpoints)) {
if (oldEndpoints != UNINITIALIZED_ENDPOINTS && oldEndpoints.equals(newEndpoints)) {
return;
}

Expand All @@ -106,7 +113,7 @@ protected final void setEndpoints(Iterable<Endpoint> endpoints) {
}

private void completeInitialEndpointsFuture(List<Endpoint> endpoints) {
if (!endpoints.isEmpty() && !initialEndpointsFuture.isDone()) {
if (endpoints != UNINITIALIZED_ENDPOINTS && !initialEndpointsFuture.isDone()) {
initialEndpointsFuture.complete(endpoints);
}
}
Expand Down
Expand Up @@ -128,6 +128,11 @@ public static HealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate,
snapshot = ImmutableList.copyOf(contexts.values());
}
snapshot.forEach(ctx -> ctx.initialCheckFuture.join());

// If all endpoints are unhealthy, we will not have called setEndpoints even once, meaning listeners
// aren't notified that we've finished an initial health check. We make sure to refresh endpoints once
// on initialization to ensure this happens, even if the endpoints are currently empty.
refreshEndpoints();
}

private void updateCandidates(List<Endpoint> candidates) {
Expand Down Expand Up @@ -163,6 +168,13 @@ private void updateCandidates(List<Endpoint> candidates) {
}
}

private void refreshEndpoints() {
// Rebuild the endpoint list and notify.
setEndpoints(delegate.endpoints().stream()
.filter(healthyEndpoints::contains)
.collect(toImmutableList()));
}

@Override
public void close() {
if (closed) {
Expand Down Expand Up @@ -333,10 +345,7 @@ private void updateHealth(double health, boolean updateEvenIfDestroyed) {
}

if (updated) {
// Rebuild the endpoint list and notify.
setEndpoints(delegate.endpoints().stream()
.filter(healthyEndpoints::contains)
.collect(toImmutableList()));
refreshEndpoints();
}

initialCheckFuture.complete(null);
Expand Down
Expand Up @@ -78,6 +78,26 @@ void delegateUpdateCandidatesWhileCreatingHealthCheckedEndpointGroup() {
}.build();
}

@Test
void startsUnhealthyAwaitsForEmptyEndpoints() throws Exception {
final MockEndpointGroup delegate = new MockEndpointGroup();
delegate.set(Endpoint.of("foo"));
final AtomicReference<HealthCheckerContext> ctxCapture = new AtomicReference<>();

try (HealthCheckedEndpointGroup group = new AbstractHealthCheckedEndpointGroupBuilder(delegate) {
@Override
protected Function<? super HealthCheckerContext, ? extends AsyncCloseable> newCheckerFactory() {
return ctx -> {
ctxCapture.set(ctx);
ctx.updateHealth(0);
return () -> CompletableFuture.completedFuture(null);
};
}
}.build()) {
assertThat(group.awaitInitialEndpoints(10, TimeUnit.SECONDS)).isEmpty();
}
}

@Test
void disappearedEndpoint() {
// Start with an endpoint group that has healthy 'foo'.
Expand Down

0 comments on commit b97c21d

Please sign in to comment.