Skip to content

Commit

Permalink
Rename group messages() API to messaging()
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 10, 2016
1 parent 9c084b4 commit 329ef7b
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 28 deletions.
Expand Up @@ -66,7 +66,7 @@ public static void main(String[] args) throws Exception {
System.out.println("Joining membership group");
group.join().thenAccept(member -> {
System.out.println("Joined group with member ID: " + member.id());
MessageConsumer<String> consumer = member.messages().consumer("tasks");
MessageConsumer<String> consumer = member.messaging().consumer("tasks");
consumer.onMessage(task -> {
System.out.println("Received message");
try {
Expand All @@ -80,7 +80,7 @@ public static void main(String[] args) throws Exception {
group.onJoin(member -> {
System.out.println(member.id() + " joined the group!");

member.messages().producer("tasks").send("hello").thenRun(() -> {
member.messaging().producer("tasks").send("hello").thenRun(() -> {
System.out.println("Task complete!");
});
});
Expand Down
4 changes: 2 additions & 2 deletions groups/src/main/java/io/atomix/group/DistributedGroup.java
Expand Up @@ -60,7 +60,7 @@
* }
* </pre>
* <h2>Configuration</h2>
* {@code DistributedGroup} instances can be configured to control {@link GroupMember#messages() communication}
* {@code DistributedGroup} instances can be configured to control {@link GroupMember#messaging() communication}
* between members of the group. To configure groups, a {@link DistributedGroup.Options} instance must be provided
* when constructing the initial group instance.
* <p>
Expand Down Expand Up @@ -407,7 +407,7 @@ public Options withAddress(Address address) {
*
* @return The group message client.
*/
MessageClient messages();
MessageClient messaging();

/**
* Gets a group member by ID.
Expand Down
6 changes: 3 additions & 3 deletions groups/src/main/java/io/atomix/group/GroupMember.java
Expand Up @@ -36,10 +36,10 @@ public interface GroupMember {
String id();

/**
* Returns the member message service.
* Returns the member message client.
*
* @return The member message service.
* @return The member message client.
*/
MessageClient messages();
MessageClient messaging();

}
7 changes: 6 additions & 1 deletion groups/src/main/java/io/atomix/group/LocalMember.java
Expand Up @@ -41,8 +41,13 @@
*/
public interface LocalMember extends GroupMember {

/**
* Returns the local member message service.
*
* @return The local member message service.
*/
@Override
MessageService messages();
MessageService messaging();

/**
* Leaves the membership group.
Expand Down
Expand Up @@ -39,6 +39,6 @@ public String id() {
}

@Override
public abstract AbstractMessageClient messages();
public abstract AbstractMessageClient messaging();

}
Expand Up @@ -36,7 +36,7 @@ public LocalGroupMember(GroupMemberInfo info, MembershipGroup group, MessageProd
}

@Override
public MemberMessageService messages() {
public MemberMessageService messaging() {
return messages;
}

Expand Down
Expand Up @@ -86,7 +86,7 @@ public Election election() {
}

@Override
public MessageClient messages() {
public MessageClient messaging() {
return messages;
}

Expand Down
Expand Up @@ -33,7 +33,7 @@ public RemoteGroupMember(GroupMemberInfo info, MembershipGroup group, MessagePro
}

@Override
public MemberMessageClient messages() {
public MemberMessageClient messaging() {
return messages;
}

Expand Down
32 changes: 16 additions & 16 deletions groups/src/test/java/io/atomix/group/DistributedGroupTest.java
Expand Up @@ -227,12 +227,12 @@ public void testDirectMessage() throws Throwable {

await(5000, 2);

member.messages().consumer("test").onMessage(message -> {
member.messaging().consumer("test").onMessage(message -> {
threadAssertEquals(message.message(), "Hello world!");
message.ack();
resume();
});
group1.member(member.id()).messages().producer("test").send("Hello world!").thenRun(this::resume);
group1.member(member.id()).messaging().producer("test").send("Hello world!").thenRun(this::resume);
await(10000, 2);
}

Expand All @@ -258,12 +258,12 @@ public void testDirectMessageFail() throws Throwable {

await(5000, 2);

member.messages().consumer("test").onMessage(message -> {
member.messaging().consumer("test").onMessage(message -> {
threadAssertEquals(message.message(), "Hello world!");
message.fail();
resume();
});
group1.member(member.id()).messages().producer("test").send("Hello world!").whenComplete((result, error) -> {
group1.member(member.id()).messaging().producer("test").send("Hello world!").whenComplete((result, error) -> {
threadAssertTrue(error instanceof MessageFailedException);
resume();
});
Expand Down Expand Up @@ -292,18 +292,18 @@ public void testDirectMessageRedeliverToPersistentMember() throws Throwable {

await(5000, 2);

member.messages().consumer("test").onMessage(message -> {
member.messaging().consumer("test").onMessage(message -> {
threadAssertEquals(message.message(), "Hello world!");
group1.join("test").thenAccept(localMember -> {
localMember.messages().consumer("test").onMessage(m -> {
localMember.messaging().consumer("test").onMessage(m -> {
threadAssertEquals(message.message(), "Hello world!");
m.ack();
resume();
});
});
resume();
});
group1.member(member.id()).messages().producer("test").send("Hello world!").thenRun(this::resume);
group1.member(member.id()).messaging().producer("test").send("Hello world!").thenRun(this::resume);
await(10000, 3);
}

Expand All @@ -329,12 +329,12 @@ public void testDirectMessageFailOnLeave() throws Throwable {

await(5000, 2);

member.messages().consumer("test").onMessage(message -> {
member.messaging().consumer("test").onMessage(message -> {
member.leave().thenRun(this::resume);
resume();
});

group1.member(member.id()).messages().producer("test").send("Hello world!").whenComplete((result, error) -> {
group1.member(member.id()).messaging().producer("test").send("Hello world!").whenComplete((result, error) -> {
threadAssertTrue(error instanceof MessageFailedException);
resume();
});
Expand Down Expand Up @@ -365,22 +365,22 @@ public void testGroupMessage() throws Throwable {
LocalMember member2 = group2.join().get(10, TimeUnit.SECONDS);
LocalMember member3 = group2.join().get(10, TimeUnit.SECONDS);

member1.messages().consumer("test").onMessage(message -> {
member1.messaging().consumer("test").onMessage(message -> {
threadAssertEquals(message.message(), "Hello world!");
message.ack();
resume();
});
member2.messages().consumer("test").onMessage(message -> {
member2.messaging().consumer("test").onMessage(message -> {
threadAssertEquals(message.message(), "Hello world!");
message.ack();
resume();
});
member3.messages().consumer("test").onMessage(message -> {
member3.messaging().consumer("test").onMessage(message -> {
threadAssertEquals(message.message(), "Hello world!");
message.ack();
resume();
});
group1.messages().producer("test").send("Hello world!").thenRun(this::resume);
group1.messaging().producer("test").send("Hello world!").thenRun(this::resume);
await(10000, 4);
}

Expand Down Expand Up @@ -410,20 +410,20 @@ public void testGroupMessageFailOnLeave() throws Throwable {

await(5000, 2);

member1.messages().consumer("test").onMessage(message -> {
member1.messaging().consumer("test").onMessage(message -> {
threadAssertEquals(message.message(), "Hello world!");
member1.leave();
resume();
});
member2.messages().consumer("test").onMessage(message -> {
member2.messaging().consumer("test").onMessage(message -> {
threadAssertEquals(message.message(), "Hello world!");
member2.leave();
resume();
});

MessageProducer.Options options = new MessageProducer.Options()
.withDelivery(MessageProducer.Delivery.RANDOM);
group1.messages().producer("test", options).send("Hello world!").whenComplete((result, error) -> {
group1.messaging().producer("test", options).send("Hello world!").whenComplete((result, error) -> {
threadAssertNotNull(error);
resume();
});
Expand Down

0 comments on commit 329ef7b

Please sign in to comment.