Skip to content

Commit

Permalink
loadbalancer: add an observer pattern to DefaultLoadBalancer (#2770)
Browse files Browse the repository at this point in the history
Motivation:

With a more interesting load balancer we're going to want
more visibility into what is happening.

Modifications:

Add an observer interface to thread to the load balancer.
This interface has a sub-interface for host concerns and we
can add more sub-interfaces as we continue to break parts
down.
  • Loading branch information
bryce-anderson authored Dec 13, 2023
1 parent bad9558 commit 82ad584
Show file tree
Hide file tree
Showing 15 changed files with 663 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,25 @@ private enum State {
final Addr address;
@Nullable
private final HealthCheckConfig healthCheckConfig;
private final LoadBalancerObserver.HostObserver<Addr> hostObserver;
private final ConnectionFactory<Addr, ? extends C> connectionFactory;
private final int linearSearchSpace;
private final ListenableAsyncCloseable closeable;
private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE;

DefaultHost(final String lbDescription, final Addr address,
final ConnectionFactory<Addr, ? extends C> connectionFactory,
int linearSearchSpace, @Nullable HealthCheckConfig healthCheckConfig) {
final int linearSearchSpace, final @Nullable HealthCheckConfig healthCheckConfig,
final LoadBalancerObserver.HostObserver<Addr> hostObserver) {
this.lbDescription = requireNonNull(lbDescription, "lbDescription");
this.address = requireNonNull(address, "address");
this.linearSearchSpace = linearSearchSpace;
this.connectionFactory = requireNonNull(connectionFactory, "connectionFactory");
this.healthCheckConfig = healthCheckConfig;
this.hostObserver = requireNonNull(hostObserver, "hostObserver");
this.closeable = toAsyncCloseable(graceful ->
graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync));
hostObserver.hostCreated(address);
}

@Override
Expand All @@ -119,16 +123,19 @@ public Addr address() {

@Override
public boolean markActiveIfNotClosed() {
final Object oldState = connStateUpdater.getAndUpdate(this, oldConnState -> {
final ConnState oldState = connStateUpdater.getAndUpdate(this, oldConnState -> {
if (oldConnState.state == State.EXPIRED) {
return new ConnState(oldConnState.connections, STATE_ACTIVE_NO_FAILURES);
}
// If oldConnState.state == State.ACTIVE this could mean either a duplicate event,
// or a repeated CAS operation. We could issue a warning, but as we don't know, we don't log anything.
// UNHEALTHY state cannot transition to ACTIVE without passing the health check.
return oldConnState;
}).state;
return oldState != State.CLOSED;
});
if (oldState.state == State.EXPIRED) {
hostObserver.expiredHostRevived(address, oldState.connections.length);
}
return oldState.state != State.CLOSED;
}

@Override
Expand All @@ -143,6 +150,14 @@ public void markClosed() {
final C cConn = (C) conn;
cConn.closeAsyncGracefully().subscribe();
}
if (oldState.state != State.CLOSED) {
// this is the first time this was marked closed so we need to let the observer know.
// TODO: do we need to signal closed for this host? Why isn't it closed? The
// closing should probably be re-worked to funnel closing behavior through one place
// and also define what being closed means: just the host isn't used anymore for new
// requests/connections or does it also mean that all connections have closed?
hostObserver.activeHostRemoved(address, toRemove.length);
}
}

private ConnState closeConnState() {
Expand Down Expand Up @@ -171,6 +186,7 @@ public boolean markExpired() {
if (connStateUpdater.compareAndSet(this, oldState,
new ConnState(oldState.connections, nextState))) {
cancelIfHealthCheck(oldState);
hostObserver.hostMarkedExpired(address, oldState.connections.length);
if (nextState == State.CLOSED) {
// Trigger the callback to remove the host from usedHosts array.
this.closeAsync().subscribe();
Expand Down Expand Up @@ -273,22 +289,26 @@ private void markHealthy(final HealthCheck<Addr, C> originalHealthCheckState) {
// would leak and would not be cancelled. Therefore, we cancel it here and allow failures to trigger a new
// health check.
ConnState oldState = connStateUpdater.getAndUpdate(this, previous -> {
if (DefaultHost.isUnhealthy(previous)) {
if (isUnhealthy(previous)) {
return new ConnState(previous.connections, STATE_ACTIVE_NO_FAILURES);
}
return previous;
});
if (oldState.state != originalHealthCheckState) {
cancelIfHealthCheck(oldState);
}
// Only if the previous state was a healthcheck should we notify the observer.
if (isUnhealthy(oldState)) {
hostObserver.hostRevived(address);
}
}

private void markUnhealthy(final Throwable cause) {
assert healthCheckConfig != null;
for (;;) {
ConnState previous = connStateUpdater.get(this);

if (!DefaultHost.isActive(previous) || previous.connections.length > 0
if (!isActive(previous) || previous.connections.length > 0
|| cause instanceof ConnectionLimitReachedException) {
LOGGER.debug("{}: failed to open a new connection to the host on address {}. {}.",
lbDescription, address, previous, cause);
Expand Down Expand Up @@ -317,6 +337,7 @@ private void markUnhealthy(final Throwable cause) {
"{} time(s) in a row. Error counting threshold reached, marking this host as " +
"UNHEALTHY for the selection algorithm and triggering background health-checking.",
lbDescription, address, healthCheckConfig.failedThreshold, cause);
hostObserver.hostMarkedUnhealthy(address, cause);
healthCheck.schedule(cause);
break;
}
Expand Down Expand Up @@ -363,18 +384,21 @@ private boolean addConnection(final C connection, final @Nullable HealthCheck<Ad

// If we were able to add a new connection to the list, we should mark the host as ACTIVE again and
// reset its failures counter.
final Object newState = DefaultHost.isActive(previous) || DefaultHost.isUnhealthy(previous) ?
final Object newState = isActive(previous) || isUnhealthy(previous) ?
STATE_ACTIVE_NO_FAILURES : previous.state;

if (connStateUpdater.compareAndSet(this,
previous, new ConnState(newList, newState))) {
// It could happen that the Host turned into UNHEALTHY state either concurrently with adding a new
// connection or with passing a previous health-check (if SD turned it into ACTIVE state). In both
// cases we have to cancel the "previous" ongoing health check. See "markHealthy" for more context.
if (DefaultHost.isUnhealthy(previous) &&
(currentHealthCheck == null || previous.state != currentHealthCheck)) {
assert newState == STATE_ACTIVE_NO_FAILURES;
cancelIfHealthCheck(previous);
if (isUnhealthy(previous)) {
if (currentHealthCheck == null || previous.state != currentHealthCheck) {
assert newState == STATE_ACTIVE_NO_FAILURES;
cancelIfHealthCheck(previous);
}
// If we transitioned from unhealth to healthy we need to let the observer know.
hostObserver.hostRevived(address);
}
break;
}
Expand Down Expand Up @@ -404,8 +428,8 @@ previous, new ConnState(newList, newState))) {
// Connection was already removed, nothing to do.
break;
} else if (connections.length == 1) {
assert !DefaultHost.isUnhealthy(currentConnState) : "Cannot be UNHEALTHY with #connections > 0";
if (DefaultHost.isActive(currentConnState)) {
assert !isUnhealthy(currentConnState) : "Cannot be UNHEALTHY with #connections > 0";
if (isActive(currentConnState)) {
if (connStateUpdater.compareAndSet(this, currentConnState,
new ConnState(EMPTY_ARRAY, currentConnState.state))) {
break;
Expand All @@ -420,6 +444,7 @@ previous, new ConnState(newList, newState))) {
// in the next iteration.
&& connStateUpdater.compareAndSet(this, currentConnState, CLOSED_CONN_STATE)) {
this.closeAsync().subscribe();
hostObserver.expiredHostRemoved(address);
break;
}
} else {
Expand Down Expand Up @@ -484,7 +509,7 @@ private Completable doClose(final Function<? super C, Completable> closeFunction
}

private void cancelIfHealthCheck(ConnState connState) {
if (DefaultHost.isUnhealthy(connState)) {
if (isUnhealthy(connState)) {
@SuppressWarnings("unchecked")
HealthCheck<Addr, C> healthCheck = (HealthCheck<Addr, C>) connState.state;
LOGGER.debug("{}: health check cancelled for {}.", lbDescription, healthCheck.host);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.NoActiveHostException;
import io.servicetalk.client.api.NoAvailableHostException;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource.Processor;
Expand Down Expand Up @@ -104,6 +105,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
private final int linearSearchSpace;
@Nullable
private final HealthCheckConfig healthCheckConfig;
private final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver;
private final ListenableAsyncCloseable asyncCloseable;

/**
Expand All @@ -126,7 +128,8 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
final HostSelector<ResolvedAddress, C> hostSelector,
final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory,
final int linearSearchSpace,
@Nullable final HealthCheckConfig healthCheckConfig) {
@Nullable final HealthCheckConfig healthCheckConfig,
@Nullable final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver) {
this.targetResource = requireNonNull(targetResourceName);
this.lbDescription = makeDescription(id, targetResource);
this.hostSelector = requireNonNull(hostSelector, "hostSelector");
Expand All @@ -136,6 +139,8 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
this.connectionFactory = requireNonNull(connectionFactory);
this.linearSearchSpace = linearSearchSpace;
this.healthCheckConfig = healthCheckConfig;
this.loadBalancerObserver = loadBalancerObserver != null ?
loadBalancerObserver : NoopLoadBalancerObserver.instance();
this.sequentialExecutor = new SequentialExecutor((uncaughtException) ->
LOGGER.error("{}: Uncaught exception in SequentialExecutor", this, uncaughtException));
this.asyncCloseable = toAsyncCloseable(this::doClose);
Expand Down Expand Up @@ -327,6 +332,9 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
nextHosts.add(createHost(event.address()));
}
}

// Always send the event regardless of if we update the actual list.
loadBalancerObserver.serviceDiscoveryEvent(events, usedHosts.size(), nextHosts.size());
// We've built a materially different host set so now set it for consumption and send our events.
if (hostSetChanged) {
sequentialUpdateUsedHosts(nextHosts);
Expand Down Expand Up @@ -368,7 +376,7 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
// All hosts will share the healthcheck config of the parent RR loadbalancer.
Host<ResolvedAddress, C> host = new DefaultHost<>(DefaultLoadBalancer.this.toString(), addr,
connectionFactory, linearSearchSpace, healthCheckConfig);
connectionFactory, linearSearchSpace, healthCheckConfig, loadBalancerObserver.hostObserver());
host.onClose().afterFinally(() ->
sequentialExecutor.execute(() -> {
final List<Host<ResolvedAddress, C>> currentHosts = usedHosts;
Expand All @@ -380,6 +388,7 @@ private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
currentHosts, current -> current == host);
// we only need to do anything else if we actually removed the host
if (nextHosts.size() != currentHosts.size()) {
loadBalancerObserver.hostObserver().expiredHostRemoved(host.address());
sequentialUpdateUsedHosts(nextHosts);
if (nextHosts.isEmpty()) {
// We transitioned from non-empty to empty. That means we're not ready.
Expand Down Expand Up @@ -462,19 +471,18 @@ private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final
final boolean forceNewConnectionAndReserve) {
final HostSelector<ResolvedAddress, C> currentHostSelector = hostSelector;
Single<C> result = currentHostSelector.selectConnection(selector, context, forceNewConnectionAndReserve);
if (healthCheckConfig != null) {
result = result.beforeOnError(exn -> {
if (exn instanceof NoActiveHostException && currentHostSelector.isUnHealthy()) {
final long currNextResubscribeTime = nextResubscribeTime;
if (currNextResubscribeTime >= 0 &&
healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime &&
nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) {
subscribeToEvents(true);
}
return result.beforeOnError(exn -> {
if (exn instanceof NoActiveHostException && currentHostSelector.isUnHealthy()) {
final long currNextResubscribeTime = nextResubscribeTime;
if (currNextResubscribeTime >= 0 &&
healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime &&
nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) {
subscribeToEvents(true);
}
});
}
return result;
} else if (exn instanceof NoAvailableHostException) {
loadBalancerObserver.noHostsAvailable();
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ final class DefaultLoadBalancerBuilder<ResolvedAddress, C extends LoadBalancedCo

@Nullable
private Executor backgroundExecutor;
@Nullable
private LoadBalancerObserver<ResolvedAddress> loadBalancerObserver;
private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL;
private Duration healthCheckJitter = DEFAULT_HEALTH_CHECK_JITTER;
private int healthCheckFailedConnectionsThreshold = DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
Expand Down Expand Up @@ -73,6 +75,13 @@ public LoadBalancerBuilder<ResolvedAddress, C> loadBalancingPolicy(LoadBalancing
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
@Nullable LoadBalancerObserver<ResolvedAddress> loadBalancerObserver) {
this.loadBalancerObserver = loadBalancerObserver;
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> backgroundExecutor(Executor backgroundExecutor) {
this.backgroundExecutor = new NormalizedTimeSourceExecutor(backgroundExecutor);
Expand Down Expand Up @@ -117,7 +126,8 @@ public LoadBalancerFactory<ResolvedAddress, C> build() {
healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold,
healthCheckResubscribeInterval, healthCheckResubscribeJitter);
}
return new DefaultLoadBalancerFactory<>(id, loadBalancingPolicy, linearSearchSpace, healthCheckConfig);
return new DefaultLoadBalancerFactory<>(id, loadBalancingPolicy, linearSearchSpace, healthCheckConfig,
loadBalancerObserver);
}

private static final class DefaultLoadBalancerFactory<ResolvedAddress, C extends LoadBalancedConnection>
Expand All @@ -127,14 +137,18 @@ private static final class DefaultLoadBalancerFactory<ResolvedAddress, C extends
private final LoadBalancingPolicy loadBalancingPolicy;
private final int linearSearchSpace;
@Nullable
private final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver;
@Nullable
private final HealthCheckConfig healthCheckConfig;

DefaultLoadBalancerFactory(final String id, final LoadBalancingPolicy loadBalancingPolicy,
final int linearSearchSpace, final HealthCheckConfig healthCheckConfig) {
final int linearSearchSpace, final HealthCheckConfig healthCheckConfig,
final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver) {
this.id = requireNonNull(id, "id");
this.loadBalancingPolicy = requireNonNull(loadBalancingPolicy, "loadBalancingPolicy");
this.linearSearchSpace = linearSearchSpace;
this.healthCheckConfig = healthCheckConfig;
this.loadBalancerObserver = loadBalancerObserver;
}

@Override
Expand All @@ -143,7 +157,7 @@ public <T extends C> LoadBalancer<T> newLoadBalancer(String targetResource,
ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher,
loadBalancingPolicy.buildSelector(Collections.emptyList(), targetResource), connectionFactory,
linearSearchSpace, healthCheckConfig);
linearSearchSpace, healthCheckConfig, loadBalancerObserver);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.time.Duration;
import java.util.function.Predicate;
import javax.annotation.Nullable;

/**
* Builder for {@link LoadBalancerFactory} that creates {@link LoadBalancer} instances based upon the configuration.
Expand Down Expand Up @@ -68,12 +69,20 @@
interface LoadBalancerBuilder<ResolvedAddress, C extends LoadBalancedConnection> {
/**
* Set the {@code loadBalancingPolicy} to use with this load balancer.
* @param loadBalancingPolicy the policy to use
* @param loadBalancingPolicy the {@code loadBalancingPolicy} to use
* @return {@code this}
*/
LoadBalancerBuilder<ResolvedAddress, C> loadBalancingPolicy(
LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy);

/**
* Set the {@link LoadBalancerObserver} to use with this load balancer.
* @param loadBalancerObserver the {@link LoadBalancerObserver} to use, or null to not use an observer.
* @return {code this}
*/
LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
@Nullable LoadBalancerObserver<ResolvedAddress> loadBalancerObserver);

/**
* This {@link LoadBalancer} may monitor hosts to which connection establishment has failed
* using health checks that run in the background. The health check tries to establish a new connection
Expand Down
Loading

0 comments on commit 82ad584

Please sign in to comment.