Skip to content

Commit

Permalink
Move local member task queue/connection to separate classes.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Mar 1, 2016
1 parent da974f3 commit badc942
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 91 deletions.
Expand Up @@ -291,7 +291,7 @@ private CompletableFuture<Object> onMessage(GroupMessage message) {
GroupMember member = members.get(message.member());
if (member != null) {
if (member instanceof LocalGroupMember) {
((LocalGroupMember) member).handleMessage(message.setFuture(future));
((LocalGroupMember) member).connection().handleMessage(message.setFuture(future));
} else {
future.completeExceptionally(new IllegalStateException("not a local member"));
}
Expand Down Expand Up @@ -355,7 +355,7 @@ private void onTaskEvent(GroupTask task) {
submit(new GroupCommands.Ack(task.id(), task.member(), false));
}
});
((LocalGroupMember) localMember).handleTask(task.setFuture(future));
((LocalGroupMember) localMember).tasks().handleTask(task.setFuture(future));
}
}

Expand Down
@@ -0,0 +1,87 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.coordination;

import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.util.Listener;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

/**
* Local group connection.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public class LocalGroupConnection extends GroupConnection {
private final Map<String, MessageListenerHolder> messageListeners = new ConcurrentHashMap<>();

public LocalGroupConnection(String memberId, Address address, GroupConnectionManager connections) {
super(memberId, address, connections);
}

/**
* Registers a consumer for messages sent to the local member.
* <p>
* The provided message consumer will be called when a message sent to the local member
* is received for the given {@code topic}.
*
* @param topic The message topic.
* @param consumer The message consumer.
* @param <T> The message type.
* @return The message listener.
*/
@SuppressWarnings("unchecked")
public <T> Listener<GroupMessage<T>> onMessage(String topic, Consumer<GroupMessage<T>> consumer) {
MessageListenerHolder listener = new MessageListenerHolder(consumer);
messageListeners.put(topic, listener);
return listener;
}

/**
* Handles a message to the member.
*/
void handleMessage(GroupMessage message) {
MessageListenerHolder listener = messageListeners.get(message.topic());
if (listener != null) {
listener.accept(message);
}
}

/**
* Listener holder.
*/
@SuppressWarnings("unchecked")
private class MessageListenerHolder implements Listener {
private final Consumer consumer;

private MessageListenerHolder(Consumer consumer) {
this.consumer = consumer;
}

@Override
public void accept(Object message) {
consumer.accept(message);
}

@Override
public void close() {
messageListeners.remove(this);
}
}

}
Expand Up @@ -16,12 +16,9 @@
package io.atomix.coordination;

import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.Listeners;
import io.atomix.coordination.state.GroupCommands;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -60,73 +57,22 @@
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public class LocalGroupMember extends GroupMember {
private final Map<String, MessageListenerHolder> messageListeners = new ConcurrentHashMap<>();
private final Listeners<GroupTask<Object>> taskListeners = new Listeners<>();
private final GroupConnection connection;
private final LocalGroupTaskQueue tasks;
private final LocalGroupConnection connection;

LocalGroupMember(GroupMemberInfo info, DistributedGroup group) {
super(info, group);
connection = new GroupConnection(info.memberId(), info.address(), group.connections) {
@Override
@SuppressWarnings("unchecked")
public <T, U> CompletableFuture<U> send(String topic, T message) {
CompletableFuture<U> future = new CompletableFuture<>();
handleMessage(new GroupMessage(memberId, topic, message).setFuture(future));
return future;
}
};
this.tasks = new LocalGroupTaskQueue(info.memberId(), group);
this.connection = new LocalGroupConnection(info.memberId(), info.address(), group.connections);
}

/**
* Registers a consumer for messages sent to the local member.
* <p>
* The provided message consumer will be called when a message sent to the local member
* is received for the given {@code topic}.
*
* @param topic The message topic.
* @param consumer The message consumer.
* @param <T> The message type.
* @return The message listener.
*/
@SuppressWarnings("unchecked")
public <T> Listener<GroupMessage<T>> onMessage(String topic, Consumer<GroupMessage<T>> consumer) {
MessageListenerHolder listener = new MessageListenerHolder(consumer);
messageListeners.put(topic, listener);
return listener;
}

/**
* Handles a message to the member.
*/
void handleMessage(GroupMessage message) {
MessageListenerHolder listener = messageListeners.get(message.topic());
if (listener != null) {
listener.accept(message);
}
}

/**
* Registers a consumer for tasks send to the local member.
*
* @param consumer The task consumer.
* @param <T> The task type.
* @return The task listener.
*/
@SuppressWarnings("unchecked")
public <T> Listener<GroupTask<T>> onTask(Consumer<GroupTask<T>> consumer) {
return (Listener) taskListeners.add((Consumer) consumer);
}

/**
* Handles a task.
*/
@SuppressWarnings("unchecked")
void handleTask(GroupTask task) {
taskListeners.accept(task);
@Override
public LocalGroupTaskQueue tasks() {
return tasks;
}

@Override
public GroupConnection connection() {
public LocalGroupConnection connection() {
return connection;
}

Expand Down Expand Up @@ -212,26 +158,4 @@ public CompletableFuture<Void> leave() {
});
}

/**
* Listener holder.
*/
@SuppressWarnings("unchecked")
private class MessageListenerHolder implements Listener {
private final Consumer consumer;

private MessageListenerHolder(Consumer consumer) {
this.consumer = consumer;
}

@Override
public void accept(Object message) {
consumer.accept(message);
}

@Override
public void close() {
messageListeners.remove(this);
}
}

}
@@ -0,0 +1,55 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.coordination;

import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.Listeners;

import java.util.function.Consumer;

/**
* Local group task queue.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public class LocalGroupTaskQueue extends GroupTaskQueue {
private final Listeners<GroupTask<Object>> taskListeners = new Listeners<>();

public LocalGroupTaskQueue(String memberId, DistributedGroup group) {
super(memberId, group);
}

/**
* Registers a consumer for tasks send to the local member.
*
* @param consumer The task consumer.
* @param <T> The task type.
* @return The task listener.
*/
@SuppressWarnings("unchecked")
public <T> Listener<GroupTask<T>> onTask(Consumer<GroupTask<T>> consumer) {
return (Listener) taskListeners.add((Consumer) consumer);
}

/**
* Handles a task.
*/
@SuppressWarnings("unchecked")
void handleTask(GroupTask task) {
taskListeners.accept(task);
}

}
Expand Up @@ -250,7 +250,7 @@ public void testSend() throws Throwable {
DistributedGroup group2 = createResource(new DistributedGroup.Options().withAddress(new Address("localhost", 6001)));

group1.join().thenAccept(member -> {
member.onMessage("foo", message -> {
member.connection().onMessage("foo", message -> {
threadAssertEquals(message.body(), "Hello world!");
message.reply("bar");
resume();
Expand Down Expand Up @@ -283,7 +283,7 @@ public void testDirectTask() throws Throwable {
assertEquals(group1.members().size(), 1);
assertEquals(group2.members().size(), 1);

member.onTask(task -> {
member.tasks().onTask(task -> {
threadAssertEquals(task.value(), "Hello world!");
task.ack();
resume();
Expand All @@ -308,19 +308,19 @@ public void testAllTask() throws Throwable {
assertEquals(group1.members().size(), 3);
assertEquals(group2.members().size(), 3);

member1.onTask(task -> {
member1.tasks().onTask(task -> {
threadAssertEquals(task.value(), "Hello world!");
System.out.println("RECTASK 1");
task.ack();
resume();
});
member2.onTask(task -> {
member2.tasks().onTask(task -> {
threadAssertEquals(task.value(), "Hello world!");
System.out.println("RECTASK 2");
task.ack();
resume();
});
member3.onTask(task -> {
member3.tasks().onTask(task -> {
threadAssertEquals(task.value(), "Hello world!");
System.out.println("RECTASK 3");
task.ack();
Expand Down

0 comments on commit badc942

Please sign in to comment.