Skip to content

Commit

Permalink
Refactor group task queue to push tasks through individual GroupMembers.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Mar 9, 2016
1 parent 474a909 commit ff8c787
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 98 deletions.
8 changes: 4 additions & 4 deletions group/src/main/java/io/atomix/group/DistributedGroup.java
Expand Up @@ -207,7 +207,7 @@
* increasing term for coordination and managing optimistic access to external resources.
* <h2>Direct messaging</h2>
* Members of a group and group instances can communicate with one another through the direct messaging API,
* {@link GroupConnection}. Direct messaging between group members is considered <em>unreliable</em> and is
* {@link MemberConnection}. Direct messaging between group members is considered <em>unreliable</em> and is
* done over the local node's configured {@link io.atomix.catalyst.transport.Transport}. Messages between members
* of a group are ordered according only to the transport and are not guaranteed to be delivered. While request-reply
* can be used to achieve some level of assurance that messages are delivered to specific members of the group,
Expand All @@ -223,7 +223,7 @@
* }
* </pre>
* Once a group instance has been configured with an address for direct messaging, messages can be sent between
* group members using the {@link GroupConnection} for any member of the group. Messages sent between members must
* group members using the {@link MemberConnection} for any member of the group. Messages sent between members must
* be associated with a {@link String} topic, and messages can be any value that is serializable by the group instance's
* {@link io.atomix.catalyst.serializer.Serializer}.
* <pre>
Expand All @@ -235,7 +235,7 @@
* </pre>
* Direct messages can only be <em>received</em> by a {@link LocalGroupMember} which must be created by
* {@link #join() joining} the group. Local members register a listener for a link topic on the joined member's
* {@link LocalGroupConnection}. Message listeners are asynchronous. When a {@link GroupMessage} is received
* {@link LocalMemberConnection}. Message listeners are asynchronous. When a {@link GroupMessage} is received
* by a local member, the member can perform any processing it wishes and {@link GroupMessage#reply(Object) reply}
* to the message or {@link GroupMessage#ack() acknowledge} completion of handling the message to send a response
* back to the sender.
Expand All @@ -255,7 +255,7 @@
* }
* </pre>
* It's critical that message listeners reply to messages, otherwise futures will be held in memory on the
* sending side of the {@link GroupConnection connection} until the sender or receiver is removed from the
* sending side of the {@link MemberConnection connection} until the sender or receiver is removed from the
* group.
* <h2>Task queues</h2>
* In addition to supporting direct messaging between members of the group, {@code DistributedGroup} provides
Expand Down
10 changes: 5 additions & 5 deletions group/src/main/java/io/atomix/group/GroupMember.java
Expand Up @@ -30,8 +30,8 @@ public class GroupMember {
protected final Address address;
protected final MembershipGroup group;
private final GroupProperties properties;
private final GroupTaskQueue tasks;
private final GroupConnection connection;
private final MemberTaskQueue tasks;
private final MemberConnection connection;

GroupMember(GroupMemberInfo info, MembershipGroup group) {
this.index = info.index();
Expand All @@ -40,7 +40,7 @@ public class GroupMember {
this.group = Assert.notNull(group, "group");
this.properties = new GroupProperties(memberId, group);
this.tasks = new MemberTaskQueue(memberId, group);
this.connection = new GroupConnection(memberId, address, group.connections);
this.connection = new MemberConnection(memberId, address, group.connections);
}

/**
Expand Down Expand Up @@ -98,7 +98,7 @@ public GroupProperties properties() {
*
* @return A direct connection to the member.
*/
public GroupConnection connection() {
public MemberConnection connection() {
return connection;
}

Expand All @@ -107,7 +107,7 @@ public GroupConnection connection() {
*
* @return The member's task queue.
*/
public GroupTaskQueue tasks() {
public MemberTaskQueue tasks() {
return tasks;
}

Expand Down
54 changes: 9 additions & 45 deletions group/src/main/java/io/atomix/group/GroupTaskQueue.java
Expand Up @@ -16,12 +16,9 @@
package io.atomix.group;

import io.atomix.catalyst.util.Assert;
import io.atomix.group.state.GroupCommands;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

/**
* Group task queue.
Expand All @@ -30,8 +27,6 @@
*/
public class GroupTaskQueue {
protected final MembershipGroup group;
protected long taskId;
protected final Map<Long, CompletableFuture<Void>> taskFutures = new ConcurrentHashMap<>();

protected GroupTaskQueue(MembershipGroup group) {
this.group = Assert.notNull(group, "group");
Expand All @@ -44,51 +39,20 @@ protected GroupTaskQueue(MembershipGroup group) {
* @return A completable future to be completed once the task has been acknowledged.
*/
public CompletableFuture<Void> submit(Object task) {
CompletableFuture<Void> future = new CompletableFuture<>();
final long taskId = ++this.taskId;
taskFutures.put(taskId, future);
submit(taskId, task).whenComplete((result, error) -> {
if (error != null) {
taskFutures.remove(taskId);
future.completeExceptionally(error);
}
});
return future;
}

/**
* Submits the task to the cluster.
*/
protected CompletableFuture<Void> submit(long taskId, Object task) {
synchronized (group) {
Collection<GroupMember> members = group.members();
CompletableFuture[] futures = new CompletableFuture[members.size()];
int i = 0;
for (GroupMember member : members) {
futures[i++] = group.submit(new GroupCommands.Submit(member.id(), taskId, task));
}
return CompletableFuture.allOf(futures);
}
}

/**
* Handles a task acknowledgement.
*/
void onAck(long taskId) {
CompletableFuture<Void> future = taskFutures.remove(taskId);
if (future != null) {
future.complete(null);
Collection<GroupMember> members = members();
CompletableFuture[] futures = new CompletableFuture[members.size()];
int i = 0;
for (GroupMember member : members) {
futures[i++] = member.tasks().submit(task);
}
return CompletableFuture.allOf(futures);
}

/**
* Handles a task failure.
* Returns the collection of group members to which to send tasks.
*/
void onFail(long taskId) {
CompletableFuture<Void> future = taskFutures.remove(taskId);
if (future != null) {
future.completeExceptionally(new TaskFailedException());
}
protected Collection<GroupMember> members() {
return group.members();
}

@Override
Expand Down
6 changes: 3 additions & 3 deletions group/src/main/java/io/atomix/group/LocalGroupMember.java
Expand Up @@ -41,12 +41,12 @@
*/
public class LocalGroupMember extends GroupMember {
private final LocalMemberTaskQueue tasks;
private final LocalGroupConnection connection;
private final LocalMemberConnection connection;

LocalGroupMember(GroupMemberInfo info, MembershipGroup group) {
super(info, group);
this.tasks = new LocalMemberTaskQueue(info.memberId(), group);
this.connection = new LocalGroupConnection(info.memberId(), info.address(), group.connections);
this.connection = new LocalMemberConnection(info.memberId(), info.address(), group.connections);
}

@Override
Expand All @@ -55,7 +55,7 @@ public LocalMemberTaskQueue tasks() {
}

@Override
public LocalGroupConnection connection() {
public LocalMemberConnection connection() {
return connection;
}

Expand Down
Expand Up @@ -27,10 +27,10 @@
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public class LocalGroupConnection extends GroupConnection {
public class LocalMemberConnection extends MemberConnection {
private final Map<String, MessageListenerHolder> messageListeners = new ConcurrentHashMap<>();

public LocalGroupConnection(String memberId, Address address, GroupConnectionManager connections) {
public LocalMemberConnection(String memberId, Address address, GroupConnectionManager connections) {
super(memberId, address, connections);
}

Expand Down
Expand Up @@ -28,7 +28,7 @@
public class LocalMemberTaskQueue extends MemberTaskQueue {
private final Listeners<GroupTask<Object>> taskListeners = new Listeners<>();

public LocalMemberTaskQueue(String memberId, MembershipGroup group) {
LocalMemberTaskQueue(String memberId, MembershipGroup group) {
super(memberId, group);
}

Expand Down
Expand Up @@ -26,7 +26,7 @@
* Facilitates direct communication between group members.
* <p>
* Members of a group and group instances can communicate with one another through the direct messaging API,
* {@link GroupConnection}. Direct messaging between group members is considered <em>unreliable</em> and is
* {@link MemberConnection}. Direct messaging between group members is considered <em>unreliable</em> and is
* done over the local node's configured {@link io.atomix.catalyst.transport.Transport}. Messages between members
* of a group are ordered according only to the transport and are not guaranteed to be delivered. While request-reply
* can be used to achieve some level of assurance that messages are delivered to specific members of the group,
Expand All @@ -42,7 +42,7 @@
* }
* </pre>
* Once a group instance has been configured with an address for direct messaging, messages can be sent between
* group members using the {@link GroupConnection} for any member of the group. Messages sent between members must
* group members using the {@link MemberConnection} for any member of the group. Messages sent between members must
* be associated with a {@link String} topic, and messages can be any value that is serializable by the group instance's
* {@link io.atomix.catalyst.serializer.Serializer}.
* <pre>
Expand All @@ -54,7 +54,7 @@
* </pre>
* Direct messages can only be <em>received</em> by a {@link LocalGroupMember} which must be created by
* joining the group. Local members register a listener for a link topic on the joined member's
* {@link LocalGroupConnection}. Message listeners are asynchronous. When a {@link GroupMessage} is received
* {@link LocalMemberConnection}. Message listeners are asynchronous. When a {@link GroupMessage} is received
* by a local member, the member can perform any processing it wishes and {@link GroupMessage#reply(Object) reply}
* to the message or {@link GroupMessage#ack() acknowledge} completion of handling the message to send a response
* back to the sender.
Expand All @@ -74,17 +74,17 @@
* }
* </pre>
* It's critical that message listeners reply to messages, otherwise futures will be held in memory on the
* sending side of the {@link GroupConnection connection} until the sender or receiver is removed from the
* sending side of the {@link MemberConnection connection} until the sender or receiver is removed from the
* group.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public class GroupConnection {
public class MemberConnection {
private final String memberId;
private final Address address;
private final GroupConnectionManager connections;

GroupConnection(String memberId, Address address, GroupConnectionManager connections) {
MemberConnection(String memberId, Address address, GroupConnectionManager connections) {
this.memberId = Assert.notNull(memberId, "memberId");
this.address = address;
this.connections = Assert.notNull(connections, "connections");
Expand Down
41 changes: 37 additions & 4 deletions group/src/main/java/io/atomix/group/MemberTaskQueue.java
Expand Up @@ -3,24 +3,57 @@
import io.atomix.catalyst.util.Assert;
import io.atomix.group.state.GroupCommands;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

/**
* Member task queue.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
class MemberTaskQueue extends GroupTaskQueue {
public class MemberTaskQueue extends GroupTaskQueue {
private final String memberId;
protected long taskId;
protected final Map<Long, CompletableFuture<Void>> taskFutures = new ConcurrentHashMap<>();

public MemberTaskQueue(String memberId, MembershipGroup group) {
MemberTaskQueue(String memberId, MembershipGroup group) {
super(group);
this.memberId = Assert.notNull(memberId, "memberId");
}

@Override
protected CompletableFuture<Void> submit(long taskId, Object task) {
return group.submit(new GroupCommands.Submit(memberId, taskId, task));
public CompletableFuture<Void> submit(Object task) {
CompletableFuture<Void> future = new CompletableFuture<>();
final long taskId = ++this.taskId;
taskFutures.put(taskId, future);
group.submit(new GroupCommands.Submit(memberId, taskId, task)).whenComplete((result, error) -> {
if (error != null) {
taskFutures.remove(taskId);
future.completeExceptionally(error);
}
});
return future;
}

/**
* Handles a task acknowledgement.
*/
void onAck(long taskId) {
CompletableFuture<Void> future = taskFutures.remove(taskId);
if (future != null) {
future.complete(null);
}
}

/**
* Handles a task failure.
*/
void onFail(long taskId) {
CompletableFuture<Void> future = taskFutures.remove(taskId);
if (future != null) {
future.completeExceptionally(new TaskFailedException());
}
}

@Override
Expand Down
20 changes: 6 additions & 14 deletions group/src/main/java/io/atomix/group/MembershipGroup.java
Expand Up @@ -334,27 +334,19 @@ private void onTaskEvent(GroupTask task) {
* Handles an ack event received from the cluster.
*/
private void onAckEvent(GroupCommands.Submit submit) {
if (submit.member() != null) {
GroupMember member = members.get(submit.member());
if (member != null) {
member.tasks().onAck(submit.id());
}
} else {
tasks.onAck(submit.id());
GroupMember member = members.get(submit.member());
if (member != null) {
member.tasks().onAck(submit.id());
}
}

/**
* Handles a fail event received from the cluster.
*/
private void onFailEvent(GroupCommands.Submit submit) {
if (submit.member() != null) {
GroupMember member = members.get(submit.member());
if (member != null) {
member.tasks().onFail(submit.id());
}
} else {
tasks.onFail(submit.id());
GroupMember member = members.get(submit.member());
if (member != null) {
member.tasks().onFail(submit.id());
}
}

Expand Down
18 changes: 4 additions & 14 deletions group/src/main/java/io/atomix/group/SubGroupTaskQueue.java
@@ -1,35 +1,25 @@
package io.atomix.group;

import io.atomix.catalyst.util.Assert;
import io.atomix.group.state.GroupCommands;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
* Membership group task queue.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
final class SubGroupTaskQueue extends GroupTaskQueue {
public class SubGroupTaskQueue extends GroupTaskQueue {
private final SubGroup subGroup;

public SubGroupTaskQueue(SubGroup subGroup, MembershipGroup group) {
SubGroupTaskQueue(SubGroup subGroup, MembershipGroup group) {
super(group);
this.subGroup = Assert.notNull(subGroup, "subGroup");
}

@Override
protected CompletableFuture<Void> submit(long taskId, Object task) {
synchronized (group) {
Collection<GroupMember> members = subGroup.members();
CompletableFuture[] futures = new CompletableFuture[members.size()];
int i = 0;
for (GroupMember member : members) {
futures[i++] = group.submit(new GroupCommands.Submit(member.id(), taskId, task));
}
return CompletableFuture.allOf(futures);
}
protected Collection<GroupMember> members() {
return subGroup.members();
}

@Override
Expand Down

0 comments on commit ff8c787

Please sign in to comment.