Skip to content

Commit

Permalink
Clean up managed local member with ComposableFuture.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Feb 15, 2015
1 parent 54ea5c6 commit 9304eca
Showing 1 changed file with 18 additions and 37 deletions.
Expand Up @@ -26,6 +26,7 @@
import net.kuujo.copycat.raft.RaftMember;
import net.kuujo.copycat.resource.ResourceContext;
import net.kuujo.copycat.util.ConfigurationException;
import net.kuujo.copycat.util.concurrent.ComposableFuture;
import net.kuujo.copycat.util.concurrent.Futures;
import net.kuujo.copycat.util.internal.Hash;

Expand Down Expand Up @@ -97,37 +98,33 @@ private CompletableFuture<ByteBuffer> handle(ByteBuffer request) {
* Handles an internal message.
*/
private CompletableFuture<ByteBuffer> handleInternalMessage(ByteBuffer request) {
int id = request.getInt();
MessageHandler<ByteBuffer, ByteBuffer> handler = internalHandlers.get(id);
if (handler != null) {
return handler.apply(request.slice());
} else {
return Futures.exceptionalFuture(new ClusterException("No handler registered"));
}
ComposableFuture<ByteBuffer> future = new ComposableFuture<>();
context.scheduler().execute(() -> {
int id = request.getInt();
MessageHandler<ByteBuffer, ByteBuffer> handler = internalHandlers.get(id);
if (handler != null) {
handler.apply(request.slice()).whenComplete(future);
}
});
return future;
}

/**
* Handles a message request.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private CompletableFuture<ByteBuffer> handleUserMessage(ByteBuffer request) {
CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
ComposableFuture<Object> future = new ComposableFuture<>();
context.executor().execute(() -> {
int id = request.getInt();
MessageHandler<Object, Object> handler = handlers.get(id);
if (handler != null) {
handler.apply(context.serializer().readObject(request)).whenComplete((result, error) -> {
if (error == null) {
future.complete(context.serializer().writeObject(result));
} else {
future.completeExceptionally(error);
}
});
handler.apply(context.serializer().readObject(request)).whenComplete(future);
} else {
future.completeExceptionally(new ClusterException("No handler registered"));
}
});
return future;
return future.thenApply(r -> context.serializer().writeObject(r));
}

/**
Expand Down Expand Up @@ -177,41 +174,25 @@ ManagedLocalMember unregisterInternalHandler(String topic) {
@Override
@SuppressWarnings("unchecked")
public <T, U> CompletableFuture<U> send(String topic, T message) {
CompletableFuture<U> future = new CompletableFuture<>();
ComposableFuture<U> future = new ComposableFuture<>();
context.executor().execute(() -> {
MessageHandler<T, U> handler = handlers.get(hashMap.computeIfAbsent(topic, t -> Hash.hash32(t.getBytes())));
if (handler != null) {
handler.apply(context.serializer().readObject(context.serializer().writeObject(message))).whenComplete((result, error) -> {
context.executor().execute(() -> {
if (error == null) {
future.complete(context.serializer().readObject(context.serializer().writeObject(result)));
} else {
future.completeExceptionally(error);
}
});
});
handler.apply(context.serializer().readObject(context.serializer().writeObject(message))).whenComplete(future);
}
});
return future;
return future.thenApply(r -> context.serializer().readObject(context.serializer().writeObject(r)));
}

/**
* Sends an internal message.
*/
public CompletableFuture<ByteBuffer> sendInternal(String topic, ByteBuffer message) {
CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
ComposableFuture<ByteBuffer> future = new ComposableFuture<>();
context.scheduler().execute(() -> {
MessageHandler<ByteBuffer, ByteBuffer> handler = internalHandlers.get(hashMap.computeIfAbsent(topic, t -> Hash.hash32(t.getBytes())));
if (handler != null) {
handler.apply(message).whenComplete((result, error) -> {
context.scheduler().execute(() -> {
if (error == null) {
future.complete(result);
} else {
future.completeExceptionally(error);
}
});
});
handler.apply(message).whenCompleteAsync(future, context.scheduler());
} else {
future.completeExceptionally(new ClusterException("No handler registered"));
}
Expand Down

0 comments on commit 9304eca

Please sign in to comment.