Skip to content

Commit

Permalink
Replace primary-backup client/server interfaces with implementations.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Nov 24, 2017
1 parent 1311ce3 commit 1609451
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 394 deletions.
Expand Up @@ -16,43 +16,175 @@
package io.atomix.protocols.backup;

import io.atomix.cluster.ClusterService;
import io.atomix.cluster.NodeId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.cluster.messaging.MessageSubject;
import io.atomix.primitive.PrimitiveClient;
import io.atomix.primitive.PrimitiveException;
import io.atomix.primitive.PrimitiveException.Unavailable;
import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.partition.PrimaryElection;
import io.atomix.protocols.backup.impl.DefaultPrimaryBackupClient;
import io.atomix.primitive.proxy.PrimitiveProxy;
import io.atomix.primitive.proxy.impl.BlockingAwarePrimitiveProxy;
import io.atomix.primitive.proxy.impl.RecoveringPrimitiveProxy;
import io.atomix.primitive.proxy.impl.RetryingPrimitiveProxy;
import io.atomix.protocols.backup.protocol.MetadataRequest;
import io.atomix.protocols.backup.protocol.MetadataResponse;
import io.atomix.protocols.backup.protocol.PrimaryBackupResponse.Status;
import io.atomix.protocols.backup.proxy.PrimaryBackupProxy;
import io.atomix.protocols.backup.serializer.impl.PrimaryBackupSerializers;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.concurrent.ThreadContextFactory;
import io.atomix.utils.concurrent.ThreadModel;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import io.atomix.utils.serializer.Serializer;
import org.slf4j.Logger;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* Primary-backup client.
*/
public interface PrimaryBackupClient extends PrimitiveClient<MultiPrimaryProtocol> {
public class PrimaryBackupClient implements PrimitiveClient<MultiPrimaryProtocol> {

/**
* Returns a new primary-backup client builder.
*
* @return a new primary-backup client builder
*/
static Builder builder() {
return new DefaultPrimaryBackupClient.Builder();
public static Builder builder() {
return new Builder();
}

private static final Serializer SERIALIZER = PrimaryBackupSerializers.PROTOCOL;

private final String clientName;
private final ClusterService clusterService;
private final ClusterCommunicationService communicationService;
private final PrimaryElection primaryElection;
private final ThreadContextFactory threadContextFactory;
private final ThreadContext threadContext;
private final MessageSubject metadataSubject;

public PrimaryBackupClient(
String clientName,
ClusterService clusterService,
ClusterCommunicationService communicationService,
PrimaryElection primaryElection,
ThreadContextFactory threadContextFactory) {
this.clientName = clientName;
this.clusterService = clusterService;
this.communicationService = communicationService;
this.primaryElection = primaryElection;
this.threadContextFactory = threadContextFactory;
this.threadContext = threadContextFactory.createContext();
this.metadataSubject = new MessageSubject(String.format("%s-metadata", clientName));
}

@Override
@SuppressWarnings("unchecked")
public PrimitiveProxy.Builder<MultiPrimaryProtocol> proxyBuilder(String primitiveName, PrimitiveType primitiveType, MultiPrimaryProtocol primitiveProtocol) {
return new ProxyBuilder(primitiveName, primitiveType, primitiveProtocol);
}

@Override
public CompletableFuture<Set<String>> getPrimitives(PrimitiveType primitiveType) {
CompletableFuture<Set<String>> future = new CompletableFuture<>();
MetadataRequest request = new MetadataRequest(primitiveType.id());
threadContext.execute(() -> {
NodeId primary = primaryElection.getTerm().primary();
if (primary == null) {
future.completeExceptionally(new Unavailable());
return;
}

communicationService.<MetadataRequest, MetadataResponse>sendAndReceive(
metadataSubject,
request,
SERIALIZER::encode,
SERIALIZER::decode,
primary)
.whenCompleteAsync((response, error) -> {
if (error == null) {
if (response.status() == Status.OK) {
future.complete(response.primitiveNames());
} else {
future.completeExceptionally(new PrimitiveException.Unavailable());
}
} else {
future.completeExceptionally(new PrimitiveException.Unavailable());
}
}, threadContext);
});
return future;
}

/**
* Closes the client.
* Closes the primary-backup client.
*
* @return a future to be completed once the client is closed
* @return future to be completed once the client is closed
*/
public CompletableFuture<Void> close() {
// TODO: Close client proxies
threadContext.close();
threadContextFactory.close();
return CompletableFuture.completedFuture(null);
}

/**
* Primary-backup proxy builder.
*/
CompletableFuture<Void> close();
private class ProxyBuilder extends PrimaryBackupProxy.Builder {
ProxyBuilder(String name, PrimitiveType primitiveType, MultiPrimaryProtocol primitiveProtocol) {
super(name, primitiveType, primitiveProtocol);
}

@Override
@SuppressWarnings("unchecked")
public PrimitiveProxy build() {
PrimitiveProxy.Builder<MultiPrimaryProtocol> proxyBuilder = new PrimitiveProxy.Builder(name, primitiveType, protocol) {
@Override
public PrimitiveProxy build() {
return new PrimaryBackupProxy(
clientName,
name,
primitiveType,
clusterService,
communicationService,
primaryElection,
threadContextFactory.createContext());
}
}.withMaxRetries(maxRetries)
.withRetryDelay(retryDelay);

PrimitiveProxy proxy = new RecoveringPrimitiveProxy(
clientName,
name,
primitiveType,
proxyBuilder,
threadContextFactory.createContext());

// If max retries is set, wrap the client in a retrying proxy client.
if (maxRetries > 0) {
proxy = new RetryingPrimitiveProxy(proxy, threadContextFactory.createContext(), maxRetries, retryDelay);
}

// Default the executor to use the configured thread pool executor and create a blocking aware proxy client.
Executor executor = this.executor != null ? this.executor : threadContextFactory.createContext();
return new BlockingAwarePrimitiveProxy(proxy, executor);
}
}

/**
* Primary-backup client builder.
*/
abstract class Builder implements io.atomix.utils.Builder<PrimaryBackupClient> {
public static class Builder implements io.atomix.utils.Builder<PrimaryBackupClient> {
protected String clientName = "atomix";
protected ClusterService clusterService;
protected ClusterCommunicationService communicationService;
Expand Down Expand Up @@ -142,5 +274,21 @@ public Builder withThreadContextFactory(ThreadContextFactory threadContextFactor
this.threadContextFactory = checkNotNull(threadContextFactory, "threadContextFactory cannot be null");
return this;
}

@Override
public PrimaryBackupClient build() {
Logger log = ContextualLoggerFactory.getLogger(PrimaryBackupClient.class, LoggerContext.builder(PrimaryBackupClient.class)
.addValue(clientName)
.build());
ThreadContextFactory threadContextFactory = this.threadContextFactory != null
? this.threadContextFactory
: threadModel.factory("backup-client-" + clientName + "-%d", threadPoolSize, log);
return new PrimaryBackupClient(
clientName,
clusterService,
communicationService,
primaryElection,
threadContextFactory);
}
}
}

0 comments on commit 1609451

Please sign in to comment.