Skip to content
Permalink
Browse files

Introduce automatic retries (#861)

__Motivation__

There are a few expected scenarios where ServiceTalk may use a connection for making request when the connection is already closed. This happens because there is an inherent race between a peer closing a connection and the load balancer choosing a connection. Due to this race, load balancer may end up picking a connection which is already closed but the close notification has not reached the load balancer. In such scenarios, when load balancer chooses an unusable connection writing a request will fail and the users are expected to retry these exceptions. Since in these cases we will never write the request on the wire, these requests are always safe to retry automatically.

__Modification__

We already do a form of automatic retry which is when the load balancer does not have any usable hosts. Since, this new addition is just another form of automatic retry, instead of adding a top-level builder config, now introduced an abstraction called `AutomaticRetryStrategyProvider` which provides a retry strategy for automatic retries. The client builders then can disable or modify this strategy if required.

__Result__

ServiceTalk now automatically retry expected exceptions which are safe to retry.
  • Loading branch information
NiteshKant committed Nov 13, 2019
1 parent 183c5ef commit 3c27052544a59fad21c35aa94dc0096081391df0
Showing with 646 additions and 69 deletions.
  1. +1 −0 servicetalk-client-api/build.gradle
  2. +60 −0 servicetalk-client-api/src/main/java/io/servicetalk/client/api/AutoRetryStrategyProvider.java
  3. +159 −0 servicetalk-client-api/src/main/java/io/servicetalk/client/api/DefaultAutoRetryStrategyProvider.java
  4. +1 −3 ...nal → servicetalk-client-api/src/main/java/io/servicetalk/client/api}/LoadBalancerReadyEvent.java
  5. +3 −3 ... servicetalk-client-api/src/main/java/io/servicetalk/client/api}/LoadBalancerReadySubscriber.java
  6. +146 −0 ...talk-client-api/src/test/java/io/servicetalk/client/api/DefaultAutoRetryStrategyProviderTest.java
  7. +5 −0 servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcClientBuilder.java
  8. +15 −0 servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/SingleAddressGrpcClientBuilder.java
  9. +8 −0 servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcClientBuilder.java
  10. +3 −1 servicetalk-http-api/src/main/java/io/servicetalk/http/api/BaseSingleAddressHttpClientBuilder.java
  11. +12 −2 servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpClientBuilder.java
  12. +3 −1 servicetalk-http-api/src/main/java/io/servicetalk/http/api/MultiAddressHttpClientBuilder.java
  13. +3 −1 servicetalk-http-api/src/main/java/io/servicetalk/http/api/PartitionedHttpClientBuilder.java
  14. +3 −1 servicetalk-http-api/src/main/java/io/servicetalk/http/api/SingleAddressHttpClientBuilder.java
  15. +7 −35 ...o/servicetalk/http/netty/{LoadBalancerReadyStreamingHttpClientFilter.java → AutoRetryFilter.java}
  16. +4 −2 ...k-http-netty/src/main/java/io/servicetalk/http/netty/DefaultMultiAddressUrlHttpClientBuilder.java
  17. +4 −2 ...etalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultPartitionedHttpClientBuilder.java
  18. +12 −6 ...alk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java
  19. +178 −0 servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AutoRetryTest.java
  20. +16 −9 servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/LoadBalancerReadyHttpClientTest.java
  21. +2 −2 servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java
  22. +1 −1 servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java
@@ -22,6 +22,7 @@ dependencies {

implementation project(":servicetalk-annotations")
implementation project(":servicetalk-concurrent-api-internal")
implementation project(":servicetalk-concurrent-internal")
implementation "com.google.code.findbugs:jsr305:$jsr305Version"
implementation "org.slf4j:slf4j-api:$slf4jVersion"

@@ -0,0 +1,60 @@
/*
* Copyright © 2019 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.client.api;

import io.servicetalk.concurrent.api.AsyncCloseable;
import io.servicetalk.concurrent.api.BiIntFunction;
import io.servicetalk.concurrent.api.BiIntPredicate;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Single;

import static io.servicetalk.concurrent.api.Completable.completed;
import static io.servicetalk.concurrent.api.Completable.failed;

/**
* A provider for {@link AutoRetryStrategy}.
*/
public interface AutoRetryStrategyProvider {

/**
* An {@link AutoRetryStrategyProvider} that disables automatic retries;
*/
AutoRetryStrategyProvider DISABLE_AUTO_RETRIES = __ -> (___, cause) -> failed(cause);

/**
* Create a new {@link AutoRetryStrategy} instance using the passed {@link LoadBalancer}.
*
* @param loadBalancer {@link LoadBalancer} to use.
* @return New {@link AutoRetryStrategy} instance.
*/
AutoRetryStrategy forLoadbalancer(LoadBalancer<?> loadBalancer);

/**
* A strategy to use for automatic retries. Automatic retries are done by
* the clients automatically when allowed by the passed {@link AutoRetryStrategyProvider}. These retries are not a
* substitute for user level retries which are designed to infer retry decisions based on request/error information.
* Typically such user level retries are done using protocol level filter but can also be done differently per
* request (eg: by using {@link Single#retry(BiIntPredicate)}).
*/
@FunctionalInterface
interface AutoRetryStrategy extends AsyncCloseable, BiIntFunction<Throwable, Completable> {

@Override
default Completable closeAsync() {
return completed();
}
}
}
@@ -0,0 +1,159 @@
/*
* Copyright © 2019 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.client.api;

import io.servicetalk.concurrent.api.AsyncCloseable;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.transport.api.RetryableException;

import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable;
import static io.servicetalk.concurrent.api.Completable.completed;
import static io.servicetalk.concurrent.api.Completable.failed;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;

/**
* Default implementation for {@link AutoRetryStrategyProvider}.
*/
public final class DefaultAutoRetryStrategyProvider implements AutoRetryStrategyProvider {
private final int maxRetryCount;
private final boolean waitForLb;
private final boolean retryAllRetryableExceptions;

private DefaultAutoRetryStrategyProvider(final int maxRetryCount, final boolean waitForLb,
final boolean retryAllRetryableExceptions) {
this.maxRetryCount = maxRetryCount;
this.waitForLb = waitForLb;
this.retryAllRetryableExceptions = retryAllRetryableExceptions;
}

@Override
public AutoRetryStrategy forLoadbalancer(LoadBalancer<?> loadBalancer) {
if (!waitForLb && !retryAllRetryableExceptions) {
return (count, cause) -> failed(cause);
}
return new DefaultAutoRetryStrategy(maxRetryCount, waitForLb, retryAllRetryableExceptions, loadBalancer);
}

/**
* A builder for {@link DefaultAutoRetryStrategyProvider}.
*/
public static final class Builder {
private boolean waitForLb = true;
private boolean retryAllRetryableExceptions = true;
private int maxRetries = 4;

/**
* By default, automatic retries waits for the associated {@link LoadBalancer} to be ready before triggering a
* retry for requests. This behavior may add latency to requests till the time the load balancer is ready
* instead of failing fast. This method disables the default behavior.
*
* @return {@code this}.
*/
public Builder disableWaitForLoadBalancer() {
waitForLb = false;
return this;
}

/**
* Connection closures (by the peer or locally) and new requests may happen concurrently. This means that it is
* possible for a {@link LoadBalancer} to select a connection which is already closed (concurrently) but the
* close signal has not yet been seen by the {@link LoadBalancer}. In such cases, requests fail with a
* {@link RetryableException}. By default, automatic retries always retries these {@link RetryableException}s.
* This method disables the default behavior.
*
* @return {@code this}.
*/
public Builder disableRetryAllRetryableExceptions() {
retryAllRetryableExceptions = false;
return this;
}

/**
* Updates maximum number of automatic retries done for any request.
*
* @param maxRetries Maximum number of automatic retries done for any request.
* @return {@code this}.
*/
public Builder maxRetries(int maxRetries) {
if (maxRetries <= 0) {
throw new IllegalArgumentException("maxRetries " + maxRetries + " (expected >0)");
}
this.maxRetries = maxRetries;
return this;
}

/**
* Builds a new {@link AutoRetryStrategyProvider}.
*
* @return A new {@link AutoRetryStrategyProvider}.
*/
public AutoRetryStrategyProvider build() {
return new DefaultAutoRetryStrategyProvider(maxRetries, waitForLb, retryAllRetryableExceptions);
}
}

private static final class DefaultAutoRetryStrategy implements AutoRetryStrategy {
@Nullable
private final LoadBalancerReadySubscriber loadBalancerReadySubscriber;
private final AsyncCloseable closeAsync;
private final int maxRetryCount;
private final boolean retryAllRetryableExceptions;

DefaultAutoRetryStrategy(final int maxRetryCount, final boolean waitForLb,
final boolean retryAllRetryableExceptions, final LoadBalancer<?> loadBalancer) {
this.maxRetryCount = maxRetryCount;
this.retryAllRetryableExceptions = retryAllRetryableExceptions;
if (waitForLb) {
loadBalancerReadySubscriber = new LoadBalancerReadySubscriber();
closeAsync = toAsyncCloseable(__ -> {
loadBalancerReadySubscriber.cancel();
return completed();
});
toSource(loadBalancer.eventStream()).subscribe(loadBalancerReadySubscriber);
} else {
loadBalancerReadySubscriber = null;
closeAsync = emptyAsyncCloseable();
}
}

@Override
public Completable apply(final int count, final Throwable cause) {
if (count > maxRetryCount) {
return failed(cause);
}
if (loadBalancerReadySubscriber != null && cause instanceof NoAvailableHostException) {
return loadBalancerReadySubscriber.onHostsAvailable();
}
if (retryAllRetryableExceptions && cause instanceof RetryableException) {
return completed();
}
return failed(cause);
}

@Override
public Completable closeAsync() {
return closeAsync.closeAsync();
}

@Override
public Completable closeAsyncGracefully() {
return closeAsync.closeAsyncGracefully();
}
}
}
@@ -13,9 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.client.api.internal;

import io.servicetalk.client.api.LoadBalancer;
package io.servicetalk.client.api;

import java.util.function.Predicate;

@@ -13,13 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;
package io.servicetalk.client.api;

import io.servicetalk.client.api.internal.LoadBalancerReadyEvent;
import io.servicetalk.concurrent.CompletableSource.Processor;
import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.internal.DelayedCancellable;

import javax.annotation.Nullable;

@@ -31,7 +31,7 @@
* Designed to listen for {@link LoadBalancerReadyEvent}s and provide notification when a {@link LoadBalancerReadyEvent}
* returns {@code true} from {@link LoadBalancerReadyEvent#isReady()}.
*/
final class LoadBalancerReadySubscriber implements Subscriber<Object> {
final class LoadBalancerReadySubscriber extends DelayedCancellable implements Subscriber<Object> {
@Nullable
private volatile Processor onHostsAvailable = newCompletableProcessor();

0 comments on commit 3c27052

Please sign in to comment.
You can’t perform that action at this time.