Skip to content

Commit

Permalink
Support futures for task ack/fail.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Mar 31, 2016
1 parent 0967a17 commit b8e6285
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 24 deletions.
Expand Up @@ -349,7 +349,7 @@ public void readObject(BufferInput buffer, Serializer serializer) {
/**
* Ack command.
*/
public static class Ack extends MemberCommand<Object> {
public static class Ack extends MemberCommand<Void> {
private long id;
private boolean succeeded;

Expand Down
Expand Up @@ -291,13 +291,7 @@ private void onLeaveEvent(String memberId) {
private void onTaskEvent(GroupTask task) {
AbstractGroupMember localMember = members.get(task.member());
if (localMember != null && localMember instanceof LocalMember) {
((LocalGroupMember) localMember).tasks().consumer(task.type()).onTask(task).whenComplete((succeeded, error) -> {
if (error == null && (boolean) succeeded) {
submit(new GroupCommands.Ack(task.member(), task.id(), true));
} else {
submit(new GroupCommands.Ack(task.member(), task.id(), false));
}
});
((LocalGroupMember) localMember).tasks().consumer(task.type()).onTask(task);
}
}

Expand Down
12 changes: 9 additions & 3 deletions groups/src/main/java/io/atomix/group/task/Task.java
Expand Up @@ -17,10 +17,12 @@

import io.atomix.group.DistributedGroup;

import java.util.concurrent.CompletableFuture;

/**
* Represents a reliable task received by a member to be processed and acknowledged.
* <p>
* Tasks are {@link TaskService#submit(Object) submitted} by {@link DistributedGroup} users to any member of a group.
* Tasks are {@link TaskProducer#submit(Object) submitted} by {@link DistributedGroup} users to any member of a group.
* Tasks are replicated and persisted within the Atomix cluster before being pushed to clients on a queue. Once a task
* is received by a task listener, the task may be processed asynchronously and either {@link #ack() acknowledged} or
* {@link #fail() failed} once processing is complete.
Expand Down Expand Up @@ -67,16 +69,20 @@ public interface Task<T> {
* completion of a task does not guarantee that the sender will learn of the acknowledgement. The acknowledgement
* itself may fail to reach the cluster or the sender may crash before the acknowledgement can be received.
* Acks serve only as positive acknowledgement, but the lack of an ack does not indicate failure.
*
* @return A completable future to be completed once the task has been acknowledged.
*/
void ack();
CompletableFuture<Void> ack();

/**
* Fails processing of the task.
* <p>
* Once a task is failed, a failure message will be sent back to the process that submitted the task for processing.
* Failing a task does not guarantee that the sender will learn of the failure. The process that submitted the task
* may itself fail.
*
* @return A completable future to be completed once the task has been failed.
*/
void fail();
CompletableFuture<Void> fail();

}
Expand Up @@ -19,7 +19,6 @@
import io.atomix.group.task.Task;
import io.atomix.group.task.TaskConsumer;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -65,12 +64,9 @@ public Listener<Task<T>> onTask(Consumer<Task<T>> callback) {
* Called when a task is received.
*
* @param task The received task.
* @return A completable future to be completed once the task has been processed.
*/
public CompletableFuture<Boolean> onTask(GroupTask<T> task) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
listener.accept(task.setFuture(future));
return future;
public void onTask(GroupTask<T> task) {
listener.accept(task.setSubmitter(service.submitter()));
}

@Override
Expand Down
19 changes: 12 additions & 7 deletions groups/src/main/java/io/atomix/group/task/internal/GroupTask.java
Expand Up @@ -19,7 +19,9 @@
import io.atomix.catalyst.buffer.BufferOutput;
import io.atomix.catalyst.serializer.CatalystSerializable;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.group.internal.GroupCommands;
import io.atomix.group.task.Task;
import io.atomix.group.util.Submitter;

import java.util.concurrent.CompletableFuture;

Expand All @@ -33,7 +35,7 @@ public class GroupTask<T> implements Task<T>, CatalystSerializable {
private String member;
private String type;
private T value;
private transient CompletableFuture<Boolean> future;
private transient Submitter submitter;

public GroupTask() {
}
Expand All @@ -45,8 +47,11 @@ public GroupTask(long id, String member, String type, T value) {
this.value = value;
}

GroupTask<T> setFuture(CompletableFuture<Boolean> future) {
this.future = future;
/**
* Sets the task submitter.
*/
GroupTask<T> setSubmitter(Submitter submitter) {
this.submitter = submitter;
return this;
}

Expand Down Expand Up @@ -79,13 +84,13 @@ public T task() {
}

@Override
public void ack() {
future.complete(true);
public CompletableFuture<Void> ack() {
return submitter.submit(new GroupCommands.Ack(member, id, true));
}

@Override
public void fail() {
future.complete(false);
public CompletableFuture<Void> fail() {
return submitter.submit(new GroupCommands.Ack(member, id, false));
}

@Override
Expand Down

0 comments on commit b8e6285

Please sign in to comment.