Skip to content

Commit

Permalink
loadbalancer-experimental: consolidate outlier detector concerns into…
Browse files Browse the repository at this point in the history
… the OutlierDetectorConfig (#2864)

Motivation:

We currently have two different outlier detector implementations:
the xDS compatible implementation and the L4 connection failure
implementation. Right now the configuration of them is odd: one is
configured on LoadBalancerBuilder and the other is passed to the
xDS outlier detector factory. This is a really strange API.

Modifications:

- move l4 configuration options to OutlierDetectorConfig so
  everything is in one place
- remove the LoadBalancerBuilder.outlierDetector(..) method in
  favor of one taking the OutlierDetectorConfig
- make service-discovery re-subscribing enabled even if consecutive
  connect failures is not
  • Loading branch information
bryce-anderson committed Mar 26, 2024
1 parent bf87598 commit 1f31e61
Show file tree
Hide file tree
Showing 19 changed files with 564 additions and 389 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.servicetalk.loadbalancer.LoadBalancers;
import io.servicetalk.loadbalancer.OutlierDetectorConfig;
import io.servicetalk.loadbalancer.P2CLoadBalancingPolicy;
import io.servicetalk.loadbalancer.XdsOutlierDetectorFactory;
import io.servicetalk.transport.api.HostAndPort;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -70,7 +69,7 @@ private static LoadBalancerFactory<InetSocketAddress, FilterableStreamingHttpLoa
// Whether to try to use a host regardless of health status (default: false)
.failOpen(true)
.build())
.outlierDetectorFactory(new XdsOutlierDetectorFactory<>(
.outlierDetectorConfig(
// xDS compatible outlier detection has a number of tuning knobs. There are multiple detection
// algorithms describe in more detail below. In addition to the limits appropriate to each
// algorithm there are also parameters to tune the chances of enforcement when a particular
Expand All @@ -89,14 +88,14 @@ private static LoadBalancerFactory<InetSocketAddress, FilterableStreamingHttpLoa
// 429 TOO MANY REQUESTS. In the future the classification of responses will be configurable.
new OutlierDetectorConfig.Builder()
// set the interval to 30 seconds (default: to 10 seconds)
.interval(ofSeconds(30))
.failureDetectorInterval(ofSeconds(30))
// set a more aggressive consecutive failure policy (default: 5)
.consecutive5xx(3)
// enabled failure percentage detection (default: 0)
.enforcingFailurePercentage(100)
// only allow 20% of hosts to be marked unhealthy at any one time
.maxEjectionPercentage(80)
.build()
)).build();
).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ private enum State {
requireNonNull(connectionFactory, "connectionFactory");
this.connectionFactory = healthIndicator == null ? connectionFactory :
new InstrumentedConnectionFactory<>(connectionFactory, healthIndicator);
assert healthCheckConfig == null || healthCheckConfig.failedThreshold > 0;
this.healthCheckConfig = healthCheckConfig;
this.hostObserver = requireNonNull(hostObserver, "hostObserver");
this.closeable = toAsyncCloseable(this::doClose);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
private final int linearSearchSpace;
@Nullable
private final HealthCheckConfig healthCheckConfig;
@Nullable
private final OutlierDetector<ResolvedAddress, C> outlierDetector;
private final LoadBalancerObserver loadBalancerObserver;
private final ListenableAsyncCloseable asyncCloseable;
Expand All @@ -133,7 +132,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
final int linearSearchSpace,
final LoadBalancerObserver loadBalancerObserver,
@Nullable final HealthCheckConfig healthCheckConfig,
@Nullable final Function<String, OutlierDetector<ResolvedAddress, C>> outlierDetectorFactory) {
final Function<String, OutlierDetector<ResolvedAddress, C>> outlierDetectorFactory) {
this.targetResource = requireNonNull(targetResourceName);
this.lbDescription = makeDescription(id, targetResource);
this.hostSelector = requireNonNull(hostSelector, "hostSelector");
Expand All @@ -149,7 +148,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
this.asyncCloseable = toAsyncCloseable(this::doClose);
// Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest signal.
eventStream.ignoreElements().subscribe();
this.outlierDetector = outlierDetectorFactory == null ? null : outlierDetectorFactory.apply(lbDescription);
this.outlierDetector = requireNonNull(outlierDetectorFactory, "outlierDetectorFactory").apply(lbDescription);
// We subscribe to events as the very last step so that if we subscribe to an eager service discoverer
// we already have all the fields initialized.
subscribeToEvents(false);
Expand Down Expand Up @@ -178,9 +177,7 @@ private Completable doClose(final boolean graceful) {
if (!isClosed) {
discoveryCancellable.cancel();
eventStreamProcessor.onComplete();
if (outlierDetector != null) {
outlierDetector.cancel();
}
outlierDetector.cancel();
}
isClosed = true;
List<Host<ResolvedAddress, C>> currentList = usedHosts;
Expand Down Expand Up @@ -386,11 +383,14 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv

private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
final LoadBalancerObserver.HostObserver hostObserver = loadBalancerObserver.hostObserver(addr);
// All hosts will share the healthcheck config of the parent RR loadbalancer.
final HealthIndicator indicator = outlierDetector == null ?
null : outlierDetector.newHealthIndicator(addr, hostObserver);
// All hosts will share the health check config of the parent load balancer.
final HealthIndicator indicator = outlierDetector.newHealthIndicator(addr, hostObserver);
// We don't need the host level health check if we are either not health checking at all or if the
// failed connect threshold is negative, meaning disabled.
final HealthCheckConfig hostHealthCheckConfig =
healthCheckConfig == null || healthCheckConfig.failedThreshold < 0 ? null : healthCheckConfig;
final Host<ResolvedAddress, C> host = new DefaultHost<>(lbDescription, addr, connectionFactory,
linearSearchSpace, hostObserver, healthCheckConfig, indicator);
linearSearchSpace, hostObserver, hostHealthCheckConfig, indicator);
if (indicator != null) {
indicator.setHost(host);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,11 @@
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.transport.api.ExecutionStrategy;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Function;
import javax.annotation.Nullable;

import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_INTERVAL;
import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_JITTER;
import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL;
import static io.servicetalk.loadbalancer.HealthCheckConfig.validateHealthCheckIntervals;
import static io.servicetalk.utils.internal.NumberUtils.ensurePositive;
import static java.util.Objects.requireNonNull;

Expand All @@ -51,13 +45,7 @@ final class DefaultLoadBalancerBuilder<ResolvedAddress, C extends LoadBalancedCo
private Executor backgroundExecutor;
@Nullable
private LoadBalancerObserver loadBalancerObserver;
@Nullable
private OutlierDetectorFactory<ResolvedAddress, C> outlierDetectorFactory;
private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL;
private Duration healthCheckJitter = DEFAULT_HEALTH_CHECK_JITTER;
private int healthCheckFailedConnectionsThreshold = DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
private Duration healthCheckResubscribeInterval = DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL;
private Duration healthCheckResubscribeJitter = DEFAULT_HEALTH_CHECK_JITTER;
private OutlierDetectorConfig outlierDetectorConfig = OutlierDetectorConfig.DEFAULT_CONFIG;

// package private constructor so users must funnel through providers in `LoadBalancers`
DefaultLoadBalancerBuilder(final String id) {
Expand Down Expand Up @@ -85,9 +73,9 @@ public LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorFactory(
OutlierDetectorFactory<ResolvedAddress, C> outlierDetectorFactory) {
this.outlierDetectorFactory = outlierDetectorFactory;
public LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorConfig(OutlierDetectorConfig outlierDetectorConfig) {
this.outlierDetectorConfig = outlierDetectorConfig == null ?
OutlierDetectorConfig.DEFAULT_CONFIG : outlierDetectorConfig;
return this;
}

Expand All @@ -97,54 +85,30 @@ public LoadBalancerBuilder<ResolvedAddress, C> backgroundExecutor(Executor backg
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckInterval(Duration interval, Duration jitter) {
validateHealthCheckIntervals(interval, jitter);
this.healthCheckInterval = interval;
this.healthCheckJitter = jitter;
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckResubscribeInterval(
Duration interval, Duration jitter) {
validateHealthCheckIntervals(interval, jitter);
this.healthCheckResubscribeInterval = interval;
this.healthCheckResubscribeJitter = jitter;
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckFailedConnectionsThreshold(
int threshold) {
if (threshold == 0) {
throw new IllegalArgumentException("Invalid health-check failed connections (expected != 0)");
}
this.healthCheckFailedConnectionsThreshold = threshold;
return this;
}

@Override
public LoadBalancerFactory<ResolvedAddress, C> build() {
final HealthCheckConfig healthCheckConfig;
if (this.healthCheckFailedConnectionsThreshold < 0) {
final Executor executor = getExecutor();
if (OutlierDetectorConfig.allDisabled(outlierDetectorConfig)) {
healthCheckConfig = null;
} else {
healthCheckConfig = new HealthCheckConfig(getExecutor(),
healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold,
healthCheckResubscribeInterval, healthCheckResubscribeJitter);
healthCheckConfig = new HealthCheckConfig(
executor,
outlierDetectorConfig.failureDetectorInterval(),
outlierDetectorConfig.failureDetectorIntervalJitter(),
outlierDetectorConfig.failedConnectionsThreshold(),
outlierDetectorConfig.serviceDiscoveryResubscribeInterval(),
outlierDetectorConfig.serviceDiscoveryResubscribeJitter());
}
final LoadBalancerObserver loadBalancerObserver = this.loadBalancerObserver != null ?
this.loadBalancerObserver : NoopLoadBalancerObserver.instance();
Function<String, OutlierDetector<ResolvedAddress, C>> outlierDetectorFactory;
if (this.outlierDetectorFactory == null) {
outlierDetectorFactory = null;
final Function<String, OutlierDetector<ResolvedAddress, C>> outlierDetectorFactory;
if (OutlierDetectorConfig.xDSDisabled(outlierDetectorConfig)) {
outlierDetectorFactory = (lbDescription) -> new NoopOutlierDetector<>(outlierDetectorConfig, executor);
} else {
final Executor executor = getExecutor();
outlierDetectorFactory = (String lbDescrption) ->
this.outlierDetectorFactory.newOutlierDetector(executor, lbDescrption);
outlierDetectorFactory = (lbDescription) ->
new XdsOutlierDetector<>(executor, outlierDetectorConfig, lbDescription);
}

return new DefaultLoadBalancerFactory<>(id, loadBalancingPolicy, linearSearchSpace, healthCheckConfig,
loadBalancerObserver, outlierDetectorFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
*/
abstract class DefaultRequestTracker implements RequestTracker, ScoreSupplier {
private static final long MAX_MS_TO_NS = NANOSECONDS.convert(MAX_VALUE, MILLISECONDS);
static final long DEFAULT_CANCEL_PENALTY = 5L;
static final long DEFAULT_ERROR_PENALTY = 10L;

private final StampedLock lock = new StampedLock();
/**
Expand All @@ -61,10 +59,6 @@ abstract class DefaultRequestTracker implements RequestTracker, ScoreSupplier {
private int pendingCount;
private long pendingStamp = Long.MIN_VALUE;

DefaultRequestTracker(final long halfLifeNanos) {
this(halfLifeNanos, DEFAULT_CANCEL_PENALTY, DEFAULT_ERROR_PENALTY);
}

DefaultRequestTracker(final long halfLifeNanos, final long cancelPenalty, final long errorPenalty) {
ensurePositive(halfLifeNanos, "halfLifeNanos");
this.invTau = Math.pow((halfLifeNanos / log(2)), -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.servicetalk.client.api.LoadBalancerFactory;
import io.servicetalk.concurrent.api.Executor;

import java.time.Duration;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -68,9 +67,8 @@ public LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorFactory(
OutlierDetectorFactory<ResolvedAddress, C> outlierDetectorFactory) {
delegate = delegate.outlierDetectorFactory(outlierDetectorFactory);
public LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorConfig(OutlierDetectorConfig outlierDetectorConfig) {
delegate = delegate.outlierDetectorConfig(outlierDetectorConfig);
return this;
}

Expand All @@ -86,24 +84,6 @@ public LoadBalancerBuilder<ResolvedAddress, C> linearSearchSpace(int linearSearc
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckInterval(Duration interval, Duration jitter) {
delegate = delegate.healthCheckInterval(interval, jitter);
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckResubscribeInterval(Duration interval, Duration jitter) {
delegate = delegate.healthCheckResubscribeInterval(interval, jitter);
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckFailedConnectionsThreshold(int threshold) {
delegate = delegate.healthCheckFailedConnectionsThreshold(threshold);
return this;
}

@Override
public LoadBalancerFactory<ResolvedAddress, C> build() {
return delegate.build();
Expand Down

0 comments on commit 1f31e61

Please sign in to comment.