Skip to content

Commit

Permalink
Refactored TopicTest
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmetmircik committed Feb 21, 2017
1 parent 7cb643f commit 305b810
Showing 1 changed file with 52 additions and 79 deletions.
131 changes: 52 additions & 79 deletions hazelcast/src/test/java/com/hazelcast/topic/TopicTest.java
Expand Up @@ -56,11 +56,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
Expand Down Expand Up @@ -99,38 +99,29 @@ public void testTopicPublishingMember() {
TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(nodeCount);
HazelcastInstance[] instances = factory.newInstances();

final CountDownLatch mainLatch = new CountDownLatch(nodeCount);
final AtomicInteger count1 = new AtomicInteger(0);
final AtomicInteger count2 = new AtomicInteger(0);
final AtomicInteger count3 = new AtomicInteger(0);

for (int i = 0; i < nodeCount; i++) {
final HazelcastInstance instance = instances[i];
new Thread(new Runnable() {
public void run() {
ITopic<Long> topic = instance.getTopic(randomName);
topic.addMessageListener(new MessageListener<Long>() {
public void onMessage(Message<Long> message) {
Member publishingMember = message.getPublishingMember();
if (publishingMember.equals(instance.getCluster().getLocalMember())) {
count1.incrementAndGet();
}
if (publishingMember.equals(message.getMessageObject())) {
count2.incrementAndGet();
}
if (publishingMember.localMember()) {
count3.incrementAndGet();
}
}
});
mainLatch.countDown();
ITopic<Member> topic = instance.getTopic(randomName);
topic.addMessageListener(new MessageListener<Member>() {
public void onMessage(Message<Member> message) {
Member publishingMember = message.getPublishingMember();
if (publishingMember.equals(instance.getCluster().getLocalMember())) {
count1.incrementAndGet();
}

Member messageObject = message.getMessageObject();
if (publishingMember.equals(messageObject)) {
count2.incrementAndGet();
}
if (publishingMember.localMember()) {
count3.incrementAndGet();
}
}
}).start();
}
try {
mainLatch.await(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
fail();
});
}

for (int i = 0; i < nodeCount; i++) {
Expand Down Expand Up @@ -238,64 +229,46 @@ public void testTopicGlobalOrder() throws Exception {
config.getTopicConfig(randomTopicName).setGlobalOrderingEnabled(true);

TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(nodeCount);
final HazelcastInstance[] instances = factory.newInstances(config);
final HazelcastInstance[] nodes = factory.newInstances(config);

final List<TestMessage>[] messageLists = new List[nodeCount];
final List<TestMessage>[] messageListPerNode = new List[nodeCount];
for (int i = 0; i < nodeCount; i++) {
messageLists[i] = new CopyOnWriteArrayList<TestMessage>();
messageListPerNode[i] = new CopyOnWriteArrayList<TestMessage>();
}

final CountDownLatch startLatch = new CountDownLatch(nodeCount);
final CountDownLatch messageLatch = new CountDownLatch(nodeCount * nodeCount * count);
final CountDownLatch publishLatch = new CountDownLatch(nodeCount * count);

ExecutorService ex = Executors.newFixedThreadPool(nodeCount);
for (int i = 0; i < nodeCount; i++) {
final int finalI = i;
ex.execute(new Runnable() {
public void run() {
final List<TestMessage> messages = messageLists[finalI];
HazelcastInstance hz = instances[finalI];
ITopic<TestMessage> topic = hz.getTopic(randomTopicName);
topic.addMessageListener(new MessageListener<TestMessage>() {
public void onMessage(Message<TestMessage> message) {
messages.add(message.getMessageObject());
messageLatch.countDown();
}
});

startLatch.countDown();
try {
startLatch.await(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
return;
}

Member localMember = hz.getCluster().getLocalMember();
for (int j = 0; j < count; j++) {
topic.publish(new TestMessage(localMember, UUID.randomUUID().toString()));
publishLatch.countDown();
}
final CountDownLatch messageLatch = new CountDownLatch(nodeCount * count);
// add message listeners
for (int i = 0; i < nodes.length; i++) {
final int nodeIndex = i;
ITopic<TestMessage> topic = nodes[i].getTopic(randomTopicName);
topic.addMessageListener(new MessageListener<TestMessage>() {
public void onMessage(Message<TestMessage> message) {
messageListPerNode[nodeIndex].add(message.getMessageObject());
messageLatch.countDown();
}
});
}

try {
assertTrue(publishLatch.await(2, TimeUnit.MINUTES));
assertTrue(messageLatch.await(5, TimeUnit.MINUTES));

TestMessage[] ref = new TestMessage[messageLists[0].size()];
messageLists[0].toArray(ref);
for (int i = 1; i < nodeCount; i++) {
TestMessage[] messages = new TestMessage[messageLists[i].size()];
messageLists[i].toArray(messages);

assertArrayEquals(ref, messages);
// publish messages
for (HazelcastInstance node : nodes) {
Member localMember = node.getCluster().getLocalMember();
for (int j = 0; j < count; j++) {
TestMessage message = new TestMessage(localMember, UUID.randomUUID().toString());
ITopic<Object> topic = node.getTopic(randomTopicName);
topic.publish(message);
}
} finally {
ex.shutdownNow();
}

// all messages in nodes messageLists should be equal
assertTrueEventually(new AssertTask() {
@Override
public void run() throws Exception {
int i = 0;
do {
assertEquals(messageListPerNode[i], messageListPerNode[i++]);
} while (i < nodeCount);
}
});
}

private static class TestMessage implements DataSerializable {
Expand Down Expand Up @@ -385,7 +358,7 @@ public void onMessage(Message<String> msg) {
});
topic.publish(message);

assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
assertTrue(latch.await(10000, MILLISECONDS));
}

@Test
Expand Down Expand Up @@ -433,7 +406,7 @@ public void onMessage(Message<String> msg) {
});
topic.publish(message);

assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
assertTrue(latch.await(10000, MILLISECONDS));
}

@Test
Expand Down Expand Up @@ -670,9 +643,9 @@ public void onMessage(Message<TestMessage> message) {

boolean passed = false;
for (int i = 0; i < nodeCount; i++) {
if (threads[i].size() > 1) {
passed = true;
}
if (threads[i].size() > 1) {
passed = true;
}
}
assertTrue("All listeners received messages in single thread. Expecting more threads involved", passed);
} finally {
Expand Down

0 comments on commit 305b810

Please sign in to comment.