Skip to content

Commit

Permalink
LoadBalancer wire the tracker on the host layers with the request flow (
Browse files Browse the repository at this point in the history
#2816)

Motivation

In order for the new load balancer features to work, the health indicators need to be wired as request trackers on the request flow.

Modification

- Added additional configuration options in the DefaultHttpLoadBalancerFactory to support error classifications. Feature only useful to load-balancers that support it.
- When the Context is available and a request-tracker is present in it, the request flow is now enriched with additional logic to track the state and feed it to the request-tracker.

Co-authored-by: Bryce Anderson <bryce_anderson@apple.com>
Co-authored-by: Idel Pivnitskiy <idel.pivnitskiy@apple.com>
  • Loading branch information
3 people committed Feb 1, 2024
1 parent 954c147 commit 11c5a3d
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 111 deletions.
1 change: 1 addition & 0 deletions servicetalk-http-netty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation project(":servicetalk-dns-discovery-netty")
implementation project(":servicetalk-http-utils")
implementation project(":servicetalk-loadbalancer")
implementation project(":servicetalk-loadbalancer-experimental")
implementation project(":servicetalk-logging-slf4j-internal")
implementation project(":servicetalk-tcp-netty-internal")
implementation project(":servicetalk-transport-netty")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.client.api.LoadBalancerFactory;
import io.servicetalk.client.api.ReservableRequestConcurrencyController;
import io.servicetalk.client.api.ScoreSupplier;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.FilterableStreamingHttpLoadBalancedConnection;
import io.servicetalk.http.api.HttpConnectionContext;
Expand All @@ -31,14 +34,37 @@
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpLoadBalancerFactory;
import io.servicetalk.http.api.HttpRequestMethod;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.ReservedBlockingHttpConnection;
import io.servicetalk.http.api.ReservedBlockingStreamingHttpConnection;
import io.servicetalk.http.api.ReservedHttpConnection;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.utils.BeforeFinallyHttpOperator;
import io.servicetalk.loadbalancer.ErrorClass;
import io.servicetalk.loadbalancer.RequestTracker;
import io.servicetalk.loadbalancer.RoundRobinLoadBalancers;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.ConnectException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Function;
import javax.annotation.Nullable;

import static io.servicetalk.http.api.HttpApiConversions.toReservedBlockingConnection;
import static io.servicetalk.http.api.HttpApiConversions.toReservedBlockingStreamingConnection;
import static io.servicetalk.http.api.HttpApiConversions.toReservedConnection;
import static io.servicetalk.http.api.HttpResponseStatus.StatusClass.SERVER_ERROR_5XX;
import static io.servicetalk.http.api.HttpResponseStatus.TOO_MANY_REQUESTS;
import static io.servicetalk.loadbalancer.ErrorClass.LOCAL_ORIGIN_CONNECT_FAILED;
import static io.servicetalk.loadbalancer.ErrorClass.LOCAL_ORIGIN_REQUEST_FAILED;
import static io.servicetalk.loadbalancer.RequestTracker.REQUEST_TRACKER_KEY;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;

/**
* Default implementation of {@link HttpLoadBalancerFactory}.
Expand All @@ -47,13 +73,20 @@
*/
public final class DefaultHttpLoadBalancerFactory<ResolvedAddress>
implements HttpLoadBalancerFactory<ResolvedAddress> {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHttpLoadBalancerFactory.class);
private final LoadBalancerFactory<ResolvedAddress, FilterableStreamingHttpLoadBalancedConnection> rawFactory;
private final Function<Throwable, ErrorClass> errorClassFunction;
private final Function<HttpResponseMetaData, ErrorClass> peerResponseErrorClassifier;
private final HttpExecutionStrategy strategy;

DefaultHttpLoadBalancerFactory(
final LoadBalancerFactory<ResolvedAddress, FilterableStreamingHttpLoadBalancedConnection> rawFactory,
final Function<Throwable, ErrorClass> errorClassFunction,
final Function<HttpResponseMetaData, ErrorClass> peerResponseErrorClassifier,
final HttpExecutionStrategy strategy) {
this.rawFactory = rawFactory;
this.errorClassFunction = errorClassFunction;
this.peerResponseErrorClassifier = peerResponseErrorClassifier;
this.strategy = strategy;
}

Expand All @@ -80,6 +113,36 @@ public FilterableStreamingHttpLoadBalancedConnection toLoadBalancedConnection(
return new DefaultFilterableStreamingHttpLoadBalancedConnection(connection);
}

@Override
public FilterableStreamingHttpLoadBalancedConnection toLoadBalancedConnection(
final FilterableStreamingHttpConnection connection,
final ReservableRequestConcurrencyController concurrencyController,
@Nullable final ContextMap context) {

RequestTracker hostHealthIndicator = null;
if (context == null) {
LOGGER.debug("Context is null. In order for " + DefaultHttpLoadBalancerFactory.class.getSimpleName() +
":toLoadBalancedConnection to get access to the " + RequestTracker.class.getSimpleName() +
", health-monitor of this connection, the context must not be null.");
} else {
hostHealthIndicator = context.get(REQUEST_TRACKER_KEY);
if (hostHealthIndicator == null) {
LOGGER.debug(REQUEST_TRACKER_KEY.name() + " is not set in context. " +
"In order for " + DefaultHttpLoadBalancerFactory.class.getSimpleName() +
":toLoadBalancedConnection to get access to the " + RequestTracker.class.getSimpleName() +
", health-monitor of this connection, the context must be properly wired.");
}
}

if (hostHealthIndicator == null) {
return new HttpLoadBalancerFactory.DefaultFilterableStreamingHttpLoadBalancedConnection(connection,
concurrencyController);
}

return new DefaultHttpLoadBalancedConnection(connection, concurrencyController,
errorClassFunction, peerResponseErrorClassifier, hostHealthIndicator);
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return strategy;
Expand All @@ -94,6 +157,11 @@ public HttpExecutionStrategy requiredOffloads() {
public static final class Builder<ResolvedAddress> {
private final LoadBalancerFactory<ResolvedAddress, FilterableStreamingHttpLoadBalancedConnection> rawFactory;
private final HttpExecutionStrategy strategy;
private final Function<Throwable, ErrorClass> errorClassifier = t -> t instanceof ConnectException ?
LOCAL_ORIGIN_CONNECT_FAILED : LOCAL_ORIGIN_REQUEST_FAILED;
private final Function<HttpResponseMetaData, ErrorClass> peerResponseErrorClassifier = resp ->
(resp.status().statusClass() == SERVER_ERROR_5XX || TOO_MANY_REQUESTS.equals(resp.status())) ?
ErrorClass.EXT_ORIGIN_REQUEST_FAILED : null;

private Builder(
final LoadBalancerFactory<ResolvedAddress, FilterableStreamingHttpLoadBalancedConnection> rawFactory,
Expand All @@ -108,7 +176,8 @@ private Builder(
* @return A {@link DefaultHttpLoadBalancerFactory}.
*/
public DefaultHttpLoadBalancerFactory<ResolvedAddress> build() {
return new DefaultHttpLoadBalancerFactory<>(rawFactory, strategy);
return new DefaultHttpLoadBalancerFactory<>(rawFactory, errorClassifier, peerResponseErrorClassifier,
strategy);
}

/**
Expand Down Expand Up @@ -153,10 +222,10 @@ private static final class DefaultFilterableStreamingHttpLoadBalancedConnection
@Override
public int score() {
throw new UnsupportedOperationException(
DefaultFilterableStreamingHttpLoadBalancedConnection.class.getName() +
" doesn't support scoring. " + ScoreSupplier.class.getName() +
" is only available through " + HttpLoadBalancerFactory.class.getSimpleName() +
" implementations that support scoring.");
DefaultFilterableStreamingHttpLoadBalancedConnection.class.getName() +
" doesn't support scoring. " + ScoreSupplier.class.getName() +
" is only available through " + HttpLoadBalancerFactory.class.getSimpleName() +
" implementations that support scoring.");
}

@Override
Expand Down Expand Up @@ -214,4 +283,193 @@ public String toString() {
return delegate.toString();
}
}

private static final class DefaultHttpLoadBalancedConnection
implements FilterableStreamingHttpLoadBalancedConnection {
private final FilterableStreamingHttpConnection delegate;
private final ReservableRequestConcurrencyController concurrencyController;
private final Function<Throwable, ErrorClass> errorClassFunction;
private final Function<HttpResponseMetaData, ErrorClass> peerResponseErrorClassifier;
@Nullable
private final RequestTracker tracker;

DefaultHttpLoadBalancedConnection(final FilterableStreamingHttpConnection delegate,
final ReservableRequestConcurrencyController concurrencyController,
final Function<Throwable, ErrorClass> errorClassFunction,
final Function<HttpResponseMetaData, ErrorClass> peerResponseErrorClassifier,
@Nullable final RequestTracker tracker) {
this.delegate = delegate;
this.concurrencyController = concurrencyController;
this.errorClassFunction = errorClassFunction;
this.peerResponseErrorClassifier = peerResponseErrorClassifier;
this.tracker = tracker;
}

@Override
public int score() {
return 1;
}

@Override
public ReservedHttpConnection asConnection() {
return toReservedConnection(this, executionContext().executionStrategy());
}

@Override
public ReservedBlockingStreamingHttpConnection asBlockingStreamingConnection() {
return toReservedBlockingStreamingConnection(this, executionContext().executionStrategy());
}

@Override
public ReservedBlockingHttpConnection asBlockingConnection() {
return toReservedBlockingConnection(this, executionContext().executionStrategy());
}

@Override
public Completable releaseAsync() {
return concurrencyController.releaseAsync();
}

@Override
public Completable closeAsyncGracefully() {
return delegate.closeAsyncGracefully();
}

@Override
public Result tryRequest() {
return concurrencyController.tryRequest();
}

@Override
public boolean tryReserve() {
return concurrencyController.tryReserve();
}

@Override
public void requestFinished() {
concurrencyController.requestFinished();
}

@Override
public HttpConnectionContext connectionContext() {
return delegate.connectionContext();
}

@Override
public <T> Publisher<? extends T> transportEventStream(final HttpEventKey<T> eventKey) {
return delegate.transportEventStream(eventKey);
}

@Override
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
if (tracker == null) {
return delegate.request(request).shareContextOnSubscribe();
}

return Single.defer(() -> {
final RequestTracker theTracker = new AtMostOnceDeliveryRequestTracker(tracker);
final long startTime = theTracker.beforeStart();

return delegate.request(request)
.liftSync(new BeforeFinallyHttpOperator(new TerminalSignalConsumer() {
@Override
public void onComplete() {
theTracker.onSuccess(startTime);
}

@Override
public void onError(final Throwable throwable) {
theTracker.onError(startTime, errorClassFunction.apply(throwable));
}

@Override
public void cancel() {
theTracker.onError(startTime, ErrorClass.CANCELLED);
}
}, /*discardEventsAfterCancel*/ true))

// BeforeFinallyHttpOperator conditionally outputs a Single<Meta> with a failed
// Publisher<Data> instead of the real Publisher<Data> in case a cancel signal is observed
// before completion of Meta. It also transforms the original Publisher<Data> to discard
// signals after cancel. So in order for downstream operators to get a consistent view of the
// data path map() needs to be applied last.
.map(response -> {
final ErrorClass eClass = peerResponseErrorClassifier.apply(response);
if (eClass != null) {
// The onError is triggered before the body is actually consumed.
theTracker.onError(startTime, eClass);
}
return response;
})
.shareContextOnSubscribe();
});
}

@Override
public HttpExecutionContext executionContext() {
return delegate.executionContext();
}

@Override
public StreamingHttpResponseFactory httpResponseFactory() {
return delegate.httpResponseFactory();
}

@Override
public Completable onClose() {
return delegate.onClose();
}

@Override
public Completable onClosing() {
return delegate.onClosing();
}

@Override
public Completable closeAsync() {
return delegate.closeAsync();
}

@Override
public StreamingHttpRequest newRequest(final HttpRequestMethod method, final String requestTarget) {
return delegate.newRequest(method, requestTarget);
}

@Override
public String toString() {
return delegate.toString();
}

private static final class AtMostOnceDeliveryRequestTracker implements RequestTracker {

private static final AtomicIntegerFieldUpdater<AtMostOnceDeliveryRequestTracker> doneUpdater =
newUpdater(AtMostOnceDeliveryRequestTracker.class, "done");

private final RequestTracker original;
private volatile int done;

private AtMostOnceDeliveryRequestTracker(final RequestTracker original) {
this.original = original;
}

@Override
public long beforeStart() {
return original.beforeStart();
}

@Override
public void onSuccess(final long beforeStartTimeNs) {
if (doneUpdater.compareAndSet(this, 0, 1)) {
original.onSuccess(beforeStartTimeNs);
}
}

@Override
public void onError(final long beforeStartTimeNs, final ErrorClass errorClass) {
if (doneUpdater.compareAndSet(this, 0, 1)) {
original.onError(beforeStartTimeNs, errorClass);
}
}
}
}
}

0 comments on commit 11c5a3d

Please sign in to comment.