Skip to content

Commit

Permalink
Allow direct messages to be failed when member leaves the group.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 3, 2016
1 parent dab453d commit f688d3f
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 34 deletions.
29 changes: 22 additions & 7 deletions groups/src/main/java/io/atomix/group/internal/GroupCommands.java
Expand Up @@ -303,20 +303,20 @@ public Object message() {
}

/**
* Returns the message dispatch policy.
* Returns the message delivery policy.
*
* @return The message dispatch policy.
* @return The message delivery policy.
*/
public MessageProducer.Delivery delivery() {
return delivery;
}

/**
* Returns the message delivery policy.
* Returns the message execution policy.
*
* @return The message delivery policy.
* @return The message execution policy.
*/
public MessageProducer.Execution deliveryPolicy() {
public MessageProducer.Execution execution() {
return execution;
}

Expand Down Expand Up @@ -411,16 +411,18 @@ public static class Ack extends MemberCommand<Void> {
private int producer;
private String queue;
private long id;
private boolean succeeded;
private Object message;

public Ack() {
}

public Ack(String member, int producer, String queue, long id, Object message) {
public Ack(String member, int producer, String queue, long id, boolean succeeded, Object message) {
super(member);
this.producer = producer;
this.queue = queue;
this.id = id;
this.succeeded = succeeded;
this.message = message;
}

Expand Down Expand Up @@ -451,6 +453,15 @@ public long id() {
return id;
}

/**
* Returns whether the message succeeded.
*
* @return Whether the message succeeded.
*/
public boolean succeeded() {
return succeeded;
}

/**
* Returns the reply message.
*
Expand All @@ -463,7 +474,10 @@ public Object message() {
@Override
public void writeObject(BufferOutput buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
buffer.writeUnsignedShort(producer).writeString(queue).writeLong(id);
buffer.writeUnsignedShort(producer);
buffer.writeString(queue);
buffer.writeLong(id);
buffer.writeBoolean(succeeded);
serializer.writeObject(message, buffer);
}

Expand All @@ -473,6 +487,7 @@ public void readObject(BufferInput buffer, Serializer serializer) {
producer = buffer.readUnsignedShort();
queue = buffer.readString();
id = buffer.readLong();
succeeded = buffer.readBoolean();
message = serializer.readObject(buffer);
}
}
Expand Down
Expand Up @@ -260,7 +260,7 @@ public Set<GroupMemberInfo> listen(Commit<GroupCommands.Listen> commit) {
public void send(Commit<GroupCommands.Message> commit) {
try {
QueueState queue = queues.computeIfAbsent(commit.operation().queue(), t -> new QueueState(members));
switch (commit.operation().deliveryPolicy()) {
switch (commit.operation().execution()) {
case SYNC:
queue.submit(new SyncMessageState(commit, queue));
break;
Expand Down
Expand Up @@ -66,7 +66,7 @@ public Object message() {
* Returns the message delivery policy.
*/
public MessageProducer.Execution delivery() {
return commit.operation().deliveryPolicy();
return commit.operation().execution();
}

/**
Expand All @@ -87,9 +87,9 @@ public MessageProducer.Execution delivery() {
/**
* Sends a response back to the message submitter.
*/
protected boolean sendReply(Object message) {
protected boolean sendReply(boolean succeeded, Object message) {
if (!complete && session().state().active()) {
session().publish("ack", new GroupCommands.Ack(commit.operation().member(), commit.operation().producer(), commit.operation().queue(), commit.operation().id(), message));
session().publish("ack", new GroupCommands.Ack(commit.operation().member(), commit.operation().producer(), commit.operation().queue(), commit.operation().id(), succeeded, message));
complete = true;
return true;
}
Expand Down
Expand Up @@ -39,18 +39,18 @@ public RequestReplyMessageState(Commit<GroupCommands.Message> commit, QueueState

@Override
public boolean send(MembersState members) {
if (commit.operation().member() != null) {
if (commit.operation().delivery() == MessageProducer.Delivery.DIRECT) {
MemberState member = members.get(commit.operation().member());
if (member != null) {
member.submit(this);
return true;
} else {
sendReply(false);
sendReply(false, null);
return false;
}
} else if (commit.operation().delivery() == MessageProducer.Delivery.RANDOM) {
if (members.isEmpty()) {
sendReply(false);
sendReply(false, null);
return false;
} else {
members.get(new Random(commit.operation().id()).nextInt(members.size())).submit(this);
Expand All @@ -61,27 +61,35 @@ public boolean send(MembersState members) {
members.forEach(m -> m.submit(this));
return true;
} else {
sendReply(false);
sendReply(false, null);
return false;
}
}

@Override
public void reply(Object message) {
ack++;
replies.set(ack + fail, message);
if (ack + fail == replies.size()) {
sendReply(replies);
queue.close(this);
if (commit.operation().delivery() == MessageProducer.Delivery.DIRECT || commit.operation().delivery() == MessageProducer.Delivery.RANDOM) {
sendReply(true, message);
} else if (commit.operation().delivery() == MessageProducer.Delivery.BROADCAST) {
ack++;
replies.set(ack + fail, message);
if (ack + fail == replies.size()) {
sendReply(fail == 0, replies);
queue.close(this);
}
}
}

@Override
public void expire() {
fail++;
if (ack + fail == replies.size()) {
sendReply(replies);
queue.close(this);
if (commit.operation().delivery() == MessageProducer.Delivery.DIRECT || commit.operation().delivery() == MessageProducer.Delivery.RANDOM) {
sendReply(false, null);
} else if (commit.operation().delivery() == MessageProducer.Delivery.BROADCAST) {
fail++;
if (ack + fail == replies.size()) {
sendReply(false, replies);
queue.close(this);
}
}
}

Expand Down
Expand Up @@ -42,28 +42,28 @@ public boolean send(MembersState members) {
member.submit(this);
return true;
} else {
sendReply(false);
sendReply(false, null);
return false;
}
} else if (commit.operation().delivery() == MessageProducer.Delivery.RANDOM) {
if (members.isEmpty()) {
sendReply(false);
sendReply(false, null);
return false;
} else {
members.get(new Random(commit.operation().id()).nextInt(members.size())).submit(this);
return true;
}
} else if (commit.operation().delivery() == MessageProducer.Delivery.BROADCAST) {
if (members.isEmpty()) {
sendReply(false);
sendReply(false, null);
return false;
} else {
this.members = members.size();
members.forEach(m -> m.submit(this));
return true;
}
} else {
sendReply(false);
sendReply(false, null);
return false;
}
}
Expand All @@ -78,9 +78,9 @@ public void reply(Object message) {

if (ack + fail == members) {
if (fail == 0) {
sendReply(true);
sendReply(true, null);
} else {
sendReply(false);
sendReply(false, null);
}
queue.close(this);
}
Expand All @@ -90,7 +90,7 @@ public void reply(Object message) {
public void expire() {
fail++;
if (ack + fail == members) {
sendReply(false);
sendReply(false, null);
queue.close(this);
}
}
Expand Down
Expand Up @@ -100,7 +100,7 @@ public Delivery getDelivery() {
* @return The producer options.
*/
public Options withExecution(Execution execution) {
this.execution = Assert.notNull(execution, "deliveryPolicy");
this.execution = Assert.notNull(execution, "execution");
return this;
}

Expand Down
Expand Up @@ -64,13 +64,17 @@ void onAck(GroupCommands.Ack ack) {
CompletableFuture messageFuture = messageFutures.remove(messageId);
if (messageFuture != null) {
if (execution == Execution.SYNC) {
if ((Boolean) ack.message()) {
if (ack.succeeded()) {
messageFuture.complete(null);
} else {
messageFuture.completeExceptionally(new MessageFailedException("message failed"));
}
} else if (execution == Execution.REQUEST_REPLY) {
messageFuture.complete(ack.message());
if (ack.succeeded()) {
messageFuture.complete(ack.message());
} else {
messageFuture.completeExceptionally(new MessageFailedException("message failed"));
}
}
}
}
Expand Down

0 comments on commit f688d3f

Please sign in to comment.