Skip to content

Commit

Permalink
Ensure NettyRemoteMember executes callbacks in the caller's context.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 26, 2015
1 parent 8022eda commit e750185
Showing 1 changed file with 30 additions and 9 deletions.
Expand Up @@ -66,7 +66,7 @@ protected ByteBufBuffer initialValue() {
private Channel channel;
private ChannelHandlerContext context;
private final Map<String, Integer> hashMap = new HashMap<>();
private final Map<Object, CompletableFuture> responseFutures = new HashMap<>(1024);
private final Map<Object, ContextualFuture> responseFutures = new HashMap<>(1024);
private boolean connected;
private long requestId;
private CompletableFuture<RemoteMember> connectFuture;
Expand Down Expand Up @@ -94,7 +94,7 @@ public InetSocketAddress address() {

@Override
public <T, U> CompletableFuture<U> send(String topic, T message) {
final CompletableFuture<U> future = new CompletableFuture<>();
final ContextualFuture<U> future = new ContextualFuture<>(getContext());
if (channel != null) {
long requestId = ++this.requestId;
ByteBufBuffer buffer = BUFFER.get();
Expand All @@ -109,11 +109,15 @@ public <T, U> CompletableFuture<U> send(String topic, T message) {
if (channelFuture.isSuccess()) {
responseFutures.put(requestId, future);
} else {
future.completeExceptionally(new ClusterException(channelFuture.cause()));
future.context.execute(() -> {
future.completeExceptionally(new ClusterException(channelFuture.cause()));
});
}
});
} else {
future.completeExceptionally(new ClusterException("Client not connected"));
future.context.execute(() -> {
future.completeExceptionally(new ClusterException("Client not connected"));
});
}
return future;
}
Expand All @@ -125,7 +129,7 @@ public CompletableFuture<Void> execute(Task<Void> task) {

@Override
public <T> CompletableFuture<T> submit(Task<T> task) {
final CompletableFuture<T> future = new CompletableFuture<>();
final ContextualFuture<T> future = new ContextualFuture<>(getContext());
if (channel != null) {
long requestId = ++this.requestId;
ByteBufBuffer buffer = BUFFER.get();
Expand All @@ -139,11 +143,15 @@ public <T> CompletableFuture<T> submit(Task<T> task) {
if (channelFuture.isSuccess()) {
responseFutures.put(requestId, future);
} else {
future.completeExceptionally(new ClusterException(channelFuture.cause()));
future.context.execute(() -> {
future.completeExceptionally(new ClusterException(channelFuture.cause()));
});
}
});
} else {
future.completeExceptionally(new ClusterException("Client not connected"));
future.context.execute(() -> {
future.completeExceptionally(new ClusterException("Client not connected"));
});
}
return future;
}
Expand Down Expand Up @@ -241,6 +249,17 @@ public CompletableFuture<Void> close() {
return closeFuture;
}

/**
* Contextual future.
*/
private static class ContextualFuture<T> extends CompletableFuture<T> {
private final ExecutionContext context;

private ContextualFuture(ExecutionContext context) {
this.context = context;
}
}

/**
* Client channel handler.
*/
Expand All @@ -255,12 +274,14 @@ public void channelActive(ChannelHandlerContext context) {
public void channelRead(ChannelHandlerContext context, Object message) {
ByteBuf response = (ByteBuf) message;
long responseId = response.readLong();
CompletableFuture responseFuture = responseFutures.remove(responseId);
ContextualFuture responseFuture = responseFutures.remove(responseId);
if (responseFuture != null) {
ByteBufBuffer buffer = BUFFER.get();
buffer.setByteBuf(response.slice());
Object result = serializer.readObject(buffer);
responseFuture.complete(result);
responseFuture.context.execute(() -> {
responseFuture.complete(result);
});
}
response.release();
}
Expand Down

0 comments on commit e750185

Please sign in to comment.