Skip to content

Commit

Permalink
Add recovering/retrying proxy clients.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 30, 2017
1 parent 66aca1d commit ec312e2
Show file tree
Hide file tree
Showing 7 changed files with 502 additions and 77 deletions.
Expand Up @@ -21,14 +21,19 @@
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.proxy.RaftProxy;
import io.atomix.protocols.raft.proxy.RaftProxyClient;
import io.atomix.protocols.raft.proxy.RecoveryStrategy;
import io.atomix.protocols.raft.proxy.impl.BlockingAwareRaftProxyClient;
import io.atomix.protocols.raft.proxy.impl.DefaultRaftProxy;
import io.atomix.protocols.raft.proxy.impl.NodeSelectorManager;
import io.atomix.protocols.raft.proxy.impl.RaftProxyManager;
import io.atomix.protocols.raft.proxy.impl.RecoveringRaftProxyClient;
import io.atomix.protocols.raft.proxy.impl.RetryingRaftProxyClient;
import io.atomix.utils.concurrent.ThreadPoolContext;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -122,7 +127,32 @@ public String toString() {
private class SessionBuilder extends RaftProxy.Builder {
@Override
public RaftProxy build() {
RaftProxyClient client = sessionManager.openSession(name, serviceType, readConsistency, communicationStrategy, executor, timeout).join();
RaftProxyClient.Builder clientBuilder = new RaftProxyClient.Builder() {
@Override
public RaftProxyClient build() {
return sessionManager.openSession(name, serviceType, readConsistency, communicationStrategy, timeout).join();
}
};

RaftProxyClient client;

// If the recovery strategy is set to RECOVER, wrap the builder in a recovering proxy client.
if (recoveryStrategy == RecoveryStrategy.RECOVER) {
client = new RecoveringRaftProxyClient(clientBuilder);
} else {
client = clientBuilder.build();
}

// If max retries is set, wrap the client in a retrying proxy client.
if (maxRetries > 0) {
client = new RetryingRaftProxyClient(client, new ThreadPoolContext(threadPoolExecutor), maxRetries, retryDelay);
}

// Default the executor to use the configured thread pool executor and create a blocking aware proxy client.
Executor executor = this.executor != null ? this.executor : new ThreadPoolContext(threadPoolExecutor);
client = new BlockingAwareRaftProxyClient(client, executor);

// Create the proxy.
return new DefaultRaftProxy(client);
}
}
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/
package io.atomix.protocols.raft.proxy;

import io.atomix.protocols.raft.CommunicationStrategies;
import io.atomix.protocols.raft.CommunicationStrategy;
import io.atomix.protocols.raft.EventType;
import io.atomix.protocols.raft.OperationId;
Expand All @@ -25,10 +24,10 @@
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -138,67 +137,57 @@ default <T, R> CompletableFuture<R> submit(OperationId operationId, Function<T,
/**
* Raft session builder.
*/
abstract class Builder implements io.atomix.utils.Builder<RaftProxy> {
protected String name;
protected ServiceType serviceType;
protected ReadConsistency readConsistency = ReadConsistency.LINEARIZABLE;
abstract class Builder extends RaftProxyClient.Builder {
protected Executor executor;
protected CommunicationStrategy communicationStrategy = CommunicationStrategies.LEADER;
protected Duration timeout = Duration.ofMillis(0);

/**
* Sets the session name.
*
* @param name The service name.
* @return The session builder.
*/
@Override
public Builder withName(String name) {
this.name = checkNotNull(name, "name cannot be null");
return this;
return (Builder) super.withName(name);
}

/**
* Sets the service type.
*
* @param serviceType The service type.
* @return The session builder.
*/
@Override
public Builder withServiceType(String serviceType) {
return withServiceType(ServiceType.from(serviceType));
}

/**
* Sets the service type.
*
* @param serviceType The service type.
* @return The session builder.
*/
@Override
public Builder withServiceType(ServiceType serviceType) {
this.serviceType = checkNotNull(serviceType, "serviceType cannot be null");
return this;
return (Builder) super.withServiceType(serviceType);
}

/**
* Sets the session's read consistency level.
*
* @param consistency the session's read consistency level
* @return the proxy builder
*/
@Override
public Builder withReadConsistency(ReadConsistency consistency) {
this.readConsistency = checkNotNull(consistency, "consistency cannot be null");
return this;
return (Builder) super.withReadConsistency(consistency);
}

/**
* Sets the session's communication strategy.
*
* @param communicationStrategy The session's communication strategy.
* @return The session builder.
* @throws NullPointerException if the communication strategy is null
*/
@Override
public Builder withMaxRetries(int maxRetries) {
return (Builder) super.withMaxRetries(maxRetries);
}

@Override
public Builder withRetryDelayMillis(long retryDelayMillis) {
return (Builder) super.withRetryDelayMillis(retryDelayMillis);
}

@Override
public Builder withRetryDelay(long retryDelay, TimeUnit timeUnit) {
return (Builder) super.withRetryDelay(retryDelay, timeUnit);
}

@Override
public Builder withRetryDelay(Duration retryDelay) {
return (Builder) super.withRetryDelay(retryDelay);
}

@Override
public Builder withCommunicationStrategy(CommunicationStrategy communicationStrategy) {
this.communicationStrategy = checkNotNull(communicationStrategy, "communicationStrategy");
return this;
return (Builder) super.withCommunicationStrategy(communicationStrategy);
}

@Override
public Builder withRecoveryStrategy(RecoveryStrategy recoveryStrategy) {
return (Builder) super.withRecoveryStrategy(recoveryStrategy);
}

/**
Expand All @@ -213,29 +202,17 @@ public Builder withExecutor(Executor executor) {
return this;
}

/**
* Sets the session timeout.
*
* @param timeout The session timeout.
* @return The session builder.
* @throws IllegalArgumentException if the session timeout is not positive
*/
public Builder withTimeout(long timeout) {
return withTimeout(Duration.ofMillis(timeout));
@Override
public Builder withTimeout(long timeoutMillis) {
return (Builder) super.withTimeout(timeoutMillis);
}

/**
* Sets the session timeout.
*
* @param timeout The session timeout.
* @return The session builder.
* @throws IllegalArgumentException if the session timeout is not positive
* @throws NullPointerException if the timeout is null
*/
@Override
public Builder withTimeout(Duration timeout) {
checkArgument(!checkNotNull(timeout).isNegative(), "timeout must be positive");
this.timeout = timeout;
return this;
return (Builder) super.withTimeout(timeout);
}

@Override
public abstract RaftProxy build();
}
}
Expand Up @@ -15,17 +15,25 @@
*/
package io.atomix.protocols.raft.proxy;

import io.atomix.protocols.raft.EventType;
import io.atomix.protocols.raft.CommunicationStrategies;
import io.atomix.protocols.raft.CommunicationStrategy;
import io.atomix.protocols.raft.OperationId;
import io.atomix.protocols.raft.RaftEvent;
import io.atomix.protocols.raft.RaftOperation;
import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.ServiceType;
import io.atomix.protocols.raft.session.SessionId;
import io.atomix.storage.buffer.HeapBytes;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* Raft proxy client.
*/
Expand Down Expand Up @@ -181,4 +189,156 @@ default CompletableFuture<byte[]> execute(OperationId operationId, byte[] operat
*/
CompletableFuture<Void> close();

/**
* Raft session builder.
*/
abstract class Builder implements io.atomix.utils.Builder<RaftProxyClient> {
protected String name;
protected ServiceType serviceType;
protected ReadConsistency readConsistency = ReadConsistency.LINEARIZABLE;
protected int maxRetries = 0;
protected Duration retryDelay = Duration.ofMillis(100);
protected Executor executor;
protected CommunicationStrategy communicationStrategy = CommunicationStrategies.LEADER;
protected RecoveryStrategy recoveryStrategy = RecoveryStrategy.RECOVER;
protected Duration timeout = Duration.ofMillis(0);

/**
* Sets the session name.
*
* @param name The service name.
* @return The session builder.
*/
public Builder withName(String name) {
this.name = checkNotNull(name, "name cannot be null");
return this;
}

/**
* Sets the service type.
*
* @param serviceType The service type.
* @return The session builder.
*/
public Builder withServiceType(String serviceType) {
return withServiceType(ServiceType.from(serviceType));
}

/**
* Sets the service type.
*
* @param serviceType The service type.
* @return The session builder.
*/
public Builder withServiceType(ServiceType serviceType) {
this.serviceType = checkNotNull(serviceType, "serviceType cannot be null");
return this;
}

/**
* Sets the session's read consistency level.
*
* @param consistency the session's read consistency level
* @return the proxy builder
*/
public Builder withReadConsistency(ReadConsistency consistency) {
this.readConsistency = checkNotNull(consistency, "consistency cannot be null");
return this;
}

/**
* Sets the session's communication strategy.
*
* @param communicationStrategy The session's communication strategy.
* @return The session builder.
* @throws NullPointerException if the communication strategy is null
*/
public Builder withCommunicationStrategy(CommunicationStrategy communicationStrategy) {
this.communicationStrategy = checkNotNull(communicationStrategy, "communicationStrategy");
return this;
}

/**
* Sets the maximum number of retries before an operation can be failed.
*
* @param maxRetries the maximum number of retries before an operation can be failed
* @return the proxy builder
*/
public Builder withMaxRetries(int maxRetries) {
checkArgument(maxRetries >= 0, "maxRetries must be positive");
this.maxRetries = maxRetries;
return this;
}

/**
* Sets the operation retry delay.
*
* @param retryDelayMillis the delay between operation retries in milliseconds
* @return the proxy builder
*/
public Builder withRetryDelayMillis(long retryDelayMillis) {
return withRetryDelay(Duration.ofMillis(retryDelayMillis));
}

/**
* Sets the operation retry delay.
*
* @param retryDelay the delay between operation retries
* @param timeUnit the delay time unit
* @return the proxy builder
* @throws NullPointerException if the time unit is null
*/
public Builder withRetryDelay(long retryDelay, TimeUnit timeUnit) {
return withRetryDelay(Duration.ofMillis(timeUnit.toMillis(retryDelay)));
}

/**
* Sets the operation retry delay.
*
* @param retryDelay the delay between operation retries
* @return the proxy builder
* @throws NullPointerException if the delay is null
*/
public Builder withRetryDelay(Duration retryDelay) {
this.retryDelay = checkNotNull(retryDelay, "retryDelay cannot be null");
return this;
}

/**
* Sets the session recovery strategy.
*
* @param recoveryStrategy the session recovery strategy
* @return the proxy builder
* @throws NullPointerException if the strategy is null
*/
public Builder withRecoveryStrategy(RecoveryStrategy recoveryStrategy) {
this.recoveryStrategy = checkNotNull(recoveryStrategy, "recoveryStrategy cannot be null");
return this;
}

/**
* Sets the session timeout.
*
* @param timeoutMillis The session timeout.
* @return The session builder.
* @throws IllegalArgumentException if the session timeout is not positive
*/
public Builder withTimeout(long timeoutMillis) {
return withTimeout(Duration.ofMillis(timeoutMillis));
}

/**
* Sets the session timeout.
*
* @param timeout The session timeout.
* @return The session builder.
* @throws IllegalArgumentException if the session timeout is not positive
* @throws NullPointerException if the timeout is null
*/
public Builder withTimeout(Duration timeout) {
checkArgument(!checkNotNull(timeout).isNegative(), "timeout must be positive");
this.timeout = timeout;
return this;
}
}
}

0 comments on commit ec312e2

Please sign in to comment.