Skip to content

Commit

Permalink
Support request-reply in group messaging.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 3, 2016
1 parent d7fe1bf commit 9e173ea
Show file tree
Hide file tree
Showing 13 changed files with 380 additions and 185 deletions.
@@ -0,0 +1,84 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.group.internal;

import io.atomix.copycat.server.Commit;
import io.atomix.group.messaging.MessageProducer;

import java.util.Random;

/**
* Asynchronous message state.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
class AsyncMessageState extends MessageState {
private int members;
private int ack;
private int fail;

public AsyncMessageState(Commit<GroupCommands.Send> commit, QueueState queue) {
super(commit, queue);
}

@Override
public boolean send(MembersState members) {
if (commit.operation().member() != null) {
MemberState member = members.get(commit.operation().member());
if (member != null) {
member.submit(this);
return true;
} else {
return false;
}
} else if (commit.operation().dispatchPolicy() == MessageProducer.DispatchPolicy.RANDOM) {
if (members.isEmpty()) {
return false;
} else {
members.get(new Random(commit.operation().id()).nextInt(members.size())).submit(this);
return true;
}
} else if (commit.operation().dispatchPolicy() == MessageProducer.DispatchPolicy.BROADCAST) {
this.members = members.size();
members.forEach(m -> m.submit(this));
return true;
} else {
return false;
}
}

@Override
public void reply(Object message) {
if ((Boolean) message) {
ack++;
} else {
fail++;
}

if (ack + fail == members) {
queue.close(this);
}
}

@Override
public void expire() {
fail++;
if (ack + fail == members) {
queue.close(this);
}
}

}
39 changes: 21 additions & 18 deletions groups/src/main/java/io/atomix/group/internal/GroupCommands.java
Expand Up @@ -243,23 +243,25 @@ public void readObject(BufferInput buffer, Serializer serializer) {
} }


/** /**
* Submit command. * Send command.
*/ */
public static class Submit extends MemberCommand<Void> { public static class Send extends MemberCommand<Void> {
private long id; private long id;
private String queue; private String queue;
private Object message; private Object message;
private MessageProducer.DispatchPolicy dispatchPolicy; private MessageProducer.DispatchPolicy dispatchPolicy;
private MessageProducer.DeliveryPolicy deliveryPolicy; private MessageProducer.DeliveryPolicy deliveryPolicy;


public Submit() { public Send() {
} }


public Submit(String member, String queue, long id, Object message, MessageProducer.DispatchPolicy dispatchPolicy, MessageProducer.DeliveryPolicy deliveryPolicy) { public Send(String member, String queue, long id, Object message, MessageProducer.DispatchPolicy dispatchPolicy, MessageProducer.DeliveryPolicy deliveryPolicy) {
super(member); super(member);
this.queue = queue; this.queue = queue;
this.id = id; this.id = id;
this.message = message; this.message = message;
this.dispatchPolicy = dispatchPolicy;
this.deliveryPolicy = deliveryPolicy;
} }


/** /**
Expand Down Expand Up @@ -329,21 +331,21 @@ public void readObject(BufferInput buffer, Serializer serializer) {
} }


/** /**
* Ack command. * Reply command.
*/ */
public static class Ack extends MemberCommand<Void> { public static class Reply extends MemberCommand<Void> {
private String queue; private String queue;
private long id; private long id;
private boolean succeeded; private Object message;


public Ack() { public Reply() {
} }


public Ack(String member, String queue, long id, boolean succeeded) { public Reply(String member, String queue, long id, Object message) {
super(member); super(member);
this.queue = queue; this.queue = queue;
this.id = id; this.id = id;
this.succeeded = succeeded; this.message = message;
} }


/** /**
Expand All @@ -365,26 +367,27 @@ public long id() {
} }


/** /**
* Returns a boolean value indicating whether the message succeeded. * Returns the reply message.
* *
* @return Indicates whether the message was successfully processed. * @return The reply message.
*/ */
public boolean succeeded() { public Object message() {
return succeeded; return message;
} }


@Override @Override
public void writeObject(BufferOutput buffer, Serializer serializer) { public void writeObject(BufferOutput buffer, Serializer serializer) {
super.writeObject(buffer, serializer); super.writeObject(buffer, serializer);
buffer.writeString(queue).writeLong(id).writeBoolean(succeeded); buffer.writeString(queue).writeLong(id);
serializer.writeObject(message, buffer);
} }


@Override @Override
public void readObject(BufferInput buffer, Serializer serializer) { public void readObject(BufferInput buffer, Serializer serializer) {
super.readObject(buffer, serializer); super.readObject(buffer, serializer);
queue = buffer.readString(); queue = buffer.readString();
id = buffer.readLong(); id = buffer.readLong();
succeeded = buffer.readBoolean(); message = serializer.readObject(buffer);
} }
} }


Expand All @@ -397,10 +400,10 @@ public void resolve(SerializerRegistry registry) {
registry.register(Join.class, -130); registry.register(Join.class, -130);
registry.register(Leave.class, -131); registry.register(Leave.class, -131);
registry.register(Listen.class, -132); registry.register(Listen.class, -132);
registry.register(Submit.class, -137); registry.register(Send.class, -137);
registry.register(GroupMessage.class, -138); registry.register(GroupMessage.class, -138);
registry.register(GroupMessage.class, -139); registry.register(GroupMessage.class, -139);
registry.register(Ack.class, -140); registry.register(Reply.class, -140);
registry.register(GroupMemberInfo.class, -158); registry.register(GroupMemberInfo.class, -158);
} }
} }
Expand Down
27 changes: 18 additions & 9 deletions groups/src/main/java/io/atomix/group/internal/GroupState.java
Expand Up @@ -257,28 +257,37 @@ public Set<GroupMemberInfo> listen(Commit<GroupCommands.Listen> commit) {
/** /**
* Handles a submit commit. * Handles a submit commit.
*/ */
public void submit(Commit<GroupCommands.Submit> commit) { public void send(Commit<GroupCommands.Send> commit) {
try { try {
QueueState queue = queues.computeIfAbsent(commit.operation().queue(), t -> new QueueState(members)); QueueState queue = queues.computeIfAbsent(commit.operation().queue(), t -> new QueueState(members));
queue.submit(new MessageState(commit, queue)); switch (commit.operation().deliveryPolicy()) {
case SYNC:
queue.submit(new SyncMessageState(commit, queue));
break;
case ASYNC:
queue.submit(new AsyncMessageState(commit, queue));
break;
case REQUEST_REPLY:
queue.submit(new RequestReplyMessageState(commit, queue));
break;
default:
commit.close();
throw new IllegalArgumentException("unknown delivery policy");
}
} catch (Exception e) { } catch (Exception e) {
commit.close(); commit.close();
throw e; throw e;
} }
} }


/** /**
* Handles an ack commit. * Handles a reply commit.
*/ */
public void ack(Commit<GroupCommands.Ack> commit) { public void reply(Commit<GroupCommands.Reply> commit) {
try { try {
QueueState queue = queues.get(commit.operation().queue()); QueueState queue = queues.get(commit.operation().queue());
if (queue != null) { if (queue != null) {
if (commit.operation().succeeded()) { queue.reply(commit.operation().id(), commit.operation().member(), commit.operation().message());
queue.ack(commit.operation().id(), commit.operation().member());
} else {
queue.fail(commit.operation().id(), commit.operation().member());
}
} }
} finally { } finally {
commit.close(); commit.close();
Expand Down
20 changes: 6 additions & 14 deletions groups/src/main/java/io/atomix/group/internal/MemberState.java
Expand Up @@ -78,7 +78,7 @@ public void setSession(ServerSession session) {
this.session = session; this.session = session;
if (session != null && session.state().active()) { if (session != null && session.state().active()) {
for (MessageState message : messages.values()) { for (MessageState message : messages.values()) {
session.publish("message", new GroupMessage<>(message.index(), memberId, message.queue(), message.message())); session.publish("message", new GroupMessage<>(message.index(), memberId, message.queue(), message.message(), message.delivery()));
} }
} }
} }
Expand All @@ -96,29 +96,21 @@ public boolean persistent() {
public void submit(MessageState message) { public void submit(MessageState message) {
messages.put(message.index(), message); messages.put(message.index(), message);
if (session != null && session.state().active()) { if (session != null && session.state().active()) {
session.publish("message", new GroupMessage<>(message.index(), memberId, message.queue(), message.message())); session.publish("message", new GroupMessage<>(message.index(), memberId, message.queue(), message.message(), message.delivery()));
} }
} }


/** /**
* Acknowledges processing of a message. * Replies to the message.
*/ */
public void ack(MessageState message) { public void reply(MessageState message, Object reply) {
messages.remove(message.index()); messages.remove(message.index());
message.ack(); message.reply(reply);
}

/**
* Fails processing of a message.
*/
public void fail(MessageState message) {
messages.remove(message.index());
message.fail();
} }


@Override @Override
public void close() { public void close() {
messages.values().forEach(MessageState::fail); messages.values().forEach(MessageState::expire);
commit.close(); commit.close();
} }


Expand Down
24 changes: 12 additions & 12 deletions groups/src/main/java/io/atomix/group/internal/MembershipGroup.java
Expand Up @@ -159,7 +159,7 @@ public CompletableFuture<DistributedGroup> open() {
client.onEvent("join", this::onJoinEvent); client.onEvent("join", this::onJoinEvent);
client.onEvent("leave", this::onLeaveEvent); client.onEvent("leave", this::onLeaveEvent);
client.onEvent("message", this::onMessageEvent); client.onEvent("message", this::onMessageEvent);
client.onEvent("ack", this::onAckEvent); client.onEvent("ack", this::onReplyEvent);
client.onEvent("fail", this::onFailEvent); client.onEvent("fail", this::onFailEvent);
client.onEvent("term", this::onTermEvent); client.onEvent("term", this::onTermEvent);
client.onEvent("elect", this::onElectEvent); client.onEvent("elect", this::onElectEvent);
Expand Down Expand Up @@ -229,30 +229,30 @@ private void onMessageEvent(GroupMessage message) {
} }


/** /**
* Handles an ack event received from the cluster. * Handles a reply event received from the cluster.
*/ */
private void onAckEvent(GroupCommands.Submit submit) { private void onReplyEvent(GroupCommands.Reply reply) {
if (submit.member() != null) { if (reply.member() != null) {
AbstractGroupMember member = members.get(submit.member()); AbstractGroupMember member = members.get(reply.member());
if (member != null) { if (member != null) {
member.messages().producer(submit.queue()).onAck(submit.id()); member.messages().producer(reply.queue()).onReply(reply.id(), reply.message());
} }
} else { } else {
messages.producer(submit.queue()).onAck(submit.id()); messages.producer(reply.queue()).onReply(reply.id(), reply.message());
} }
} }


/** /**
* Handles a fail event received from the cluster. * Handles a fail event received from the cluster.
*/ */
private void onFailEvent(GroupCommands.Submit submit) { private void onFailEvent(GroupCommands.Send send) {
if (submit.member() != null) { if (send.member() != null) {
AbstractGroupMember member = members.get(submit.member()); AbstractGroupMember member = members.get(send.member());
if (member != null) { if (member != null) {
member.messages().producer(submit.queue()).onFail(submit.id()); member.messages().producer(send.queue()).onFail(send.id());
} }
} else { } else {
messages.producer(submit.queue()).onFail(submit.id()); messages.producer(send.queue()).onFail(send.id());
} }
} }


Expand Down

0 comments on commit 9e173ea

Please sign in to comment.