Skip to content

Commit

Permalink
Use ordered futures in SessionClient wrapper connect()/close() method…
Browse files Browse the repository at this point in the history
…s to ensure ordering of fluent client method calls. (#604)
  • Loading branch information
kuujo committed Jun 9, 2018
1 parent 98bdc82 commit 1316f00
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 135 deletions.
Expand Up @@ -28,6 +28,7 @@
import java.util.function.Consumer;

import static io.atomix.utils.concurrent.Futures.asyncFuture;
import static io.atomix.utils.concurrent.Futures.orderedFuture;

/**
* Raft proxy delegate that completes futures on a thread pool.
Expand All @@ -36,6 +37,8 @@ public class BlockingAwareSessionClient extends DelegatingSessionClient {
private final ThreadContext context;
private final Map<Consumer<PrimitiveState>, Consumer<PrimitiveState>> stateChangeListeners = Maps.newConcurrentMap();
private final Map<Consumer<PrimitiveEvent>, Consumer<PrimitiveEvent>> eventListeners = Maps.newConcurrentMap();
private volatile CompletableFuture<SessionClient> connectFuture;
private volatile CompletableFuture<Void> closeFuture;

public BlockingAwareSessionClient(SessionClient delegate, ThreadContext context) {
super(delegate);
Expand Down Expand Up @@ -79,11 +82,25 @@ public void removeEventListener(EventType eventType, Consumer<PrimitiveEvent> li

@Override
public CompletableFuture<SessionClient> connect() {
return asyncFuture(super.connect(), context);
if (connectFuture == null) {
synchronized (this) {
if (connectFuture == null) {
connectFuture = orderedFuture(asyncFuture(super.connect(), context));
}
}
}
return connectFuture;
}

@Override
public CompletableFuture<Void> close() {
return asyncFuture(super.close(), context);
if (closeFuture == null) {
synchronized (this) {
if (closeFuture == null) {
closeFuture = orderedFuture(asyncFuture(super.close(), context));
}
}
}
return closeFuture;
}
}
Expand Up @@ -26,7 +26,6 @@
import io.atomix.primitive.operation.PrimitiveOperation;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.session.SessionId;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.OrderedFuture;
import io.atomix.utils.concurrent.Scheduled;
import io.atomix.utils.concurrent.ThreadContext;
Expand All @@ -37,6 +36,7 @@
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand All @@ -54,13 +54,13 @@ public class RecoveringSessionClient implements SessionClient {
private final Supplier<SessionClient> proxyFactory;
private final ThreadContext context;
private Logger log;
private volatile OrderedFuture<SessionClient> clientFuture;
private volatile CompletableFuture<SessionClient> clientFuture;
private volatile SessionClient proxy;
private volatile PrimitiveState state = PrimitiveState.CLOSED;
private final Set<Consumer<PrimitiveState>> stateChangeListeners = Sets.newCopyOnWriteArraySet();
private final Multimap<EventType, Consumer<PrimitiveEvent>> eventListeners = HashMultimap.create();
private Scheduled recoverTask;
private volatile boolean connected = false;
private final AtomicBoolean connected = new AtomicBoolean();

public RecoveringSessionClient(
String clientId,
Expand Down Expand Up @@ -118,7 +118,7 @@ public PrimitiveState getState() {
private synchronized void onStateChange(PrimitiveState state) {
if (this.state != state) {
if (state == PrimitiveState.CLOSED) {
if (connected) {
if (connected.get()) {
onStateChange(PrimitiveState.SUSPENDED);
recover();
} else {
Expand Down Expand Up @@ -158,28 +158,25 @@ private void recover() {
* @return a future to be completed once the client has been opened
*/
private CompletableFuture<SessionClient> openProxy() {
if (connected) {
log.debug("Opening proxy session");
log.debug("Opening proxy session");

clientFuture = new OrderedFuture<>();
openProxy(clientFuture);
clientFuture = new OrderedFuture<>();
openProxy(clientFuture);

return clientFuture.thenApply(proxy -> {
synchronized (this) {
this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(SessionClient.class)
.addValue(proxy.sessionId())
.add("type", proxy.type())
.add("name", proxy.name())
.build());
this.proxy = proxy;
proxy.addStateChangeListener(this::onStateChange);
eventListeners.forEach(proxy::addEventListener);
onStateChange(PrimitiveState.CONNECTED);
}
return proxy;
});
}
return Futures.exceptionalFuture(new IllegalStateException("Client not open"));
return clientFuture.thenApply(proxy -> {
synchronized (this) {
this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(SessionClient.class)
.addValue(proxy.sessionId())
.add("type", proxy.type())
.add("name", proxy.name())
.build());
this.proxy = proxy;
proxy.addStateChangeListener(this::onStateChange);
eventListeners.forEach(proxy::addEventListener);
onStateChange(PrimitiveState.CONNECTED);
}
return this;
});
}

/**
Expand Down Expand Up @@ -226,18 +223,16 @@ public synchronized void removeEventListener(EventType eventType, Consumer<Primi
}

@Override
public synchronized CompletableFuture<SessionClient> connect() {
if (!connected) {
connected = true;
return openProxy().thenApply(c -> this);
public CompletableFuture<SessionClient> connect() {
if (connected.compareAndSet(false, true)) {
return openProxy();
}
return CompletableFuture.completedFuture(this);
return clientFuture;
}

@Override
public synchronized CompletableFuture<Void> close() {
if (connected) {
connected = false;
public CompletableFuture<Void> close() {
if (connected.compareAndSet(true, false)) {
if (recoverTask != null) {
recoverTask.cancel();
}
Expand Down
Expand Up @@ -27,7 +27,6 @@
import io.atomix.protocols.backup.partition.impl.PrimaryBackupPartitionClient;
import io.atomix.protocols.backup.partition.impl.PrimaryBackupPartitionServer;
import io.atomix.protocols.backup.proxy.PrimaryBackupSessionClient;
import io.atomix.utils.concurrent.AtomixFuture;
import io.atomix.utils.concurrent.ThreadContextFactory;

import java.util.Collection;
Expand Down Expand Up @@ -140,7 +139,7 @@ public CompletableFuture<Void> close() {
return CompletableFuture.completedFuture(null);
}

CompletableFuture<Void> future = new AtomixFuture<>();
CompletableFuture<Void> future = new CompletableFuture<>();
client.stop().whenComplete((clientResult, clientError) -> {
if (server != null) {
server.stop().whenComplete((serverResult, serverError) -> {
Expand Down
Expand Up @@ -41,7 +41,6 @@
import io.atomix.protocols.backup.protocol.PrimaryBackupClientProtocol;
import io.atomix.protocols.backup.protocol.PrimaryBackupResponse.Status;
import io.atomix.protocols.backup.protocol.PrimitiveDescriptor;
import io.atomix.utils.concurrent.AtomixFuture;
import io.atomix.utils.concurrent.ComposableFuture;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.logging.ContextualLoggerFactory;
Expand Down Expand Up @@ -270,7 +269,7 @@ private void handleEvent(PrimitiveEvent event) {

@Override
public CompletableFuture<SessionClient> connect() {
CompletableFuture<SessionClient> future = new AtomixFuture<>();
CompletableFuture<SessionClient> future = new CompletableFuture<>();
threadContext.execute(() -> {
connect(1, future);
});
Expand Down Expand Up @@ -308,7 +307,7 @@ private void connect(int attempt, CompletableFuture<SessionClient> future) {

@Override
public CompletableFuture<Void> close() {
CompletableFuture<Void> future = new AtomixFuture<>();
CompletableFuture<Void> future = new CompletableFuture<>();
PrimaryTerm term = this.term;
if (term.primary() != null) {
protocol.close(term.primary().memberId(), new CloseRequest(descriptor, sessionId.id()))
Expand Down
Expand Up @@ -31,7 +31,6 @@
import io.atomix.protocols.raft.proxy.impl.DefaultRaftSessionClient;
import io.atomix.protocols.raft.proxy.impl.MemberSelectorManager;
import io.atomix.protocols.raft.proxy.impl.RaftProxyManager;
import io.atomix.utils.concurrent.AtomixFuture;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.concurrent.ThreadContextFactory;
import io.atomix.utils.logging.ContextualLoggerFactory;
Expand Down Expand Up @@ -98,7 +97,7 @@ public RaftMetadataClient metadata() {

@Override
public synchronized CompletableFuture<RaftClient> connect(Collection<MemberId> cluster) {
CompletableFuture<RaftClient> future = new AtomixFuture<>();
CompletableFuture<RaftClient> future = new CompletableFuture<>();

// If the provided cluster list is null or empty, use the default list.
if (cluster == null || cluster.isEmpty()) {
Expand Down
Expand Up @@ -27,7 +27,6 @@
import io.atomix.protocols.raft.proxy.CommunicationStrategy;
import io.atomix.protocols.raft.proxy.impl.MemberSelectorManager;
import io.atomix.protocols.raft.proxy.impl.RaftProxyConnection;
import io.atomix.utils.concurrent.AtomixFuture;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.logging.LoggerContext;

Expand Down Expand Up @@ -72,7 +71,7 @@ public Collection<MemberId> getMembers() {
* @return A completable future to be completed with cluster metadata.
*/
private CompletableFuture<MetadataResponse> getMetadata() {
CompletableFuture<MetadataResponse> future = new AtomixFuture<>();
CompletableFuture<MetadataResponse> future = new CompletableFuture<>();
connection.metadata(MetadataRequest.builder().build()).whenComplete((response, error) -> {
if (error == null) {
if (response.status() == RaftResponse.Status.OK) {
Expand Down
Expand Up @@ -32,7 +32,6 @@
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.protocol.RaftRequest;
import io.atomix.protocols.raft.protocol.RaftResponse;
import io.atomix.utils.concurrent.AtomixFuture;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
Expand Down Expand Up @@ -119,7 +118,7 @@ public Collection<MemberId> members() {
* @return a future to be completed with the response
*/
public CompletableFuture<OpenSessionResponse> openSession(OpenSessionRequest request) {
CompletableFuture<OpenSessionResponse> future = new AtomixFuture<>();
CompletableFuture<OpenSessionResponse> future = new CompletableFuture<>();
if (context.isCurrentContext()) {
sendRequest(request, protocol::openSession, future);
} else {
Expand All @@ -135,7 +134,7 @@ public CompletableFuture<OpenSessionResponse> openSession(OpenSessionRequest req
* @return a future to be completed with the response
*/
public CompletableFuture<CloseSessionResponse> closeSession(CloseSessionRequest request) {
CompletableFuture<CloseSessionResponse> future = new AtomixFuture<>();
CompletableFuture<CloseSessionResponse> future = new CompletableFuture<>();
if (context.isCurrentContext()) {
sendRequest(request, protocol::closeSession, future);
} else {
Expand All @@ -151,7 +150,7 @@ public CompletableFuture<CloseSessionResponse> closeSession(CloseSessionRequest
* @return a future to be completed with the response
*/
public CompletableFuture<KeepAliveResponse> keepAlive(KeepAliveRequest request) {
CompletableFuture<KeepAliveResponse> future = new AtomixFuture<>();
CompletableFuture<KeepAliveResponse> future = new CompletableFuture<>();
if (context.isCurrentContext()) {
sendRequest(request, protocol::keepAlive, future);
} else {
Expand All @@ -167,7 +166,7 @@ public CompletableFuture<KeepAliveResponse> keepAlive(KeepAliveRequest request)
* @return a future to be completed with the response
*/
public CompletableFuture<QueryResponse> query(QueryRequest request) {
CompletableFuture<QueryResponse> future = new AtomixFuture<>();
CompletableFuture<QueryResponse> future = new CompletableFuture<>();
if (context.isCurrentContext()) {
sendRequest(request, protocol::query, future);
} else {
Expand All @@ -183,7 +182,7 @@ public CompletableFuture<QueryResponse> query(QueryRequest request) {
* @return a future to be completed with the response
*/
public CompletableFuture<CommandResponse> command(CommandRequest request) {
CompletableFuture<CommandResponse> future = new AtomixFuture<>();
CompletableFuture<CommandResponse> future = new CompletableFuture<>();
if (context.isCurrentContext()) {
sendRequest(request, protocol::command, future);
} else {
Expand All @@ -199,7 +198,7 @@ public CompletableFuture<CommandResponse> command(CommandRequest request) {
* @return a future to be completed with the response
*/
public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) {
CompletableFuture<MetadataResponse> future = new AtomixFuture<>();
CompletableFuture<MetadataResponse> future = new CompletableFuture<>();
if (context.isCurrentContext()) {
sendRequest(request, protocol::metadata, future);
} else {
Expand Down
Expand Up @@ -27,7 +27,6 @@
import io.atomix.protocols.raft.protocol.QueryRequest;
import io.atomix.protocols.raft.protocol.QueryResponse;
import io.atomix.protocols.raft.protocol.RaftResponse;
import io.atomix.utils.concurrent.AtomixFuture;
import io.atomix.utils.concurrent.ThreadContext;

import java.net.ConnectException;
Expand Down Expand Up @@ -89,7 +88,7 @@ public RaftProxyInvoker(
* @return A completable future to be completed once the command has been submitted.
*/
public CompletableFuture<byte[]> invoke(PrimitiveOperation operation) {
CompletableFuture<byte[]> future = new AtomixFuture<>();
CompletableFuture<byte[]> future = new CompletableFuture<>();
switch (operation.id().type()) {
case COMMAND:
context.execute(() -> invokeCommand(operation, future));
Expand Down
Expand Up @@ -34,7 +34,6 @@
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.protocol.RaftResponse;
import io.atomix.protocols.raft.proxy.CommunicationStrategy;
import io.atomix.utils.concurrent.AtomixFuture;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.Scheduled;
import io.atomix.utils.concurrent.ThreadContext;
Expand Down Expand Up @@ -147,7 +146,7 @@ public CompletableFuture<Void> open() {
* Opens a new session.
*
* @param serviceName The session name.
* @param primitiveType The session type.
* @param primitiveType The session type.
* @param communicationStrategy The strategy with which to communicate with servers.
* @param minTimeout The minimum session timeout.
* @param maxTimeout The maximum session timeout.
Expand Down Expand Up @@ -177,7 +176,7 @@ public CompletableFuture<RaftProxyState> openSession(
.withMaxTimeout(maxTimeout.toMillis())
.build();

CompletableFuture<RaftProxyState> future = new AtomixFuture<>();
CompletableFuture<RaftProxyState> future = new CompletableFuture<>();
ThreadContext proxyContext = threadContextFactory.createContext();
connection.openSession(request).whenCompleteAsync((response, error) -> {
if (error == null) {
Expand Down Expand Up @@ -228,7 +227,7 @@ public CompletableFuture<Void> closeSession(SessionId sessionId) {
.withSession(sessionId.id())
.build();

CompletableFuture<Void> future = new AtomixFuture<>();
CompletableFuture<Void> future = new CompletableFuture<>();
connection.closeSession(request).whenComplete((response, error) -> {
if (error == null) {
if (response.status() == RaftResponse.Status.OK) {
Expand Down Expand Up @@ -286,7 +285,7 @@ CompletableFuture<Void> resetIndexes(SessionId sessionId) {
return Futures.exceptionalFuture(new IllegalArgumentException("Unknown session: " + sessionId));
}

CompletableFuture<Void> future = new AtomixFuture<>();
CompletableFuture<Void> future = new CompletableFuture<>();

KeepAliveRequest request = KeepAliveRequest.builder()
.withSessionIds(new long[]{sessionId.id()})
Expand Down Expand Up @@ -404,7 +403,7 @@ private synchronized void scheduleKeepAlive(long lastKeepAliveTime, long timeout
}

// Schedule the keep alive for 3/4 the timeout minus the delta from the last keep-alive request.
keepAliveTimers.put(timeout, threadContext.schedule(Duration.ofMillis(Math.max(Math.max((long)(timeout * TIMEOUT_FACTOR) - delta, timeout - MIN_TIMEOUT_DELTA - delta), 0)), () -> {
keepAliveTimers.put(timeout, threadContext.schedule(Duration.ofMillis(Math.max(Math.max((long) (timeout * TIMEOUT_FACTOR) - delta, timeout - MIN_TIMEOUT_DELTA - delta), 0)), () -> {
if (open.get()) {
keepAliveSessions(lastKeepAliveTime, timeout);
}
Expand Down Expand Up @@ -435,7 +434,7 @@ private CompletableFuture<HeartbeatResponse> handleHeartbeat(HeartbeatRequest re
*/
public CompletableFuture<Void> close() {
if (open.compareAndSet(true, false)) {
CompletableFuture<Void> future = new AtomixFuture<>();
CompletableFuture<Void> future = new CompletableFuture<>();
threadContext.execute(() -> {
synchronized (this) {
for (Scheduled keepAliveFuture : keepAliveTimers.values()) {
Expand Down

0 comments on commit 1316f00

Please sign in to comment.