From 305b8106e7091d0b8fa14c10267886419da5be85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20M=C4=B1r=C3=A7=C4=B1k?= Date: Tue, 21 Feb 2017 16:33:51 +0300 Subject: [PATCH] Refactored TopicTest --- .../java/com/hazelcast/topic/TopicTest.java | 131 +++++++----------- 1 file changed, 52 insertions(+), 79 deletions(-) diff --git a/hazelcast/src/test/java/com/hazelcast/topic/TopicTest.java b/hazelcast/src/test/java/com/hazelcast/topic/TopicTest.java index cbda1807e44c..aeb9d097252b 100644 --- a/hazelcast/src/test/java/com/hazelcast/topic/TopicTest.java +++ b/hazelcast/src/test/java/com/hazelcast/topic/TopicTest.java @@ -56,11 +56,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; @RunWith(HazelcastParallelClassRunner.class) @Category({QuickTest.class, ParallelTest.class}) @@ -99,38 +99,29 @@ public void testTopicPublishingMember() { TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(nodeCount); HazelcastInstance[] instances = factory.newInstances(); - final CountDownLatch mainLatch = new CountDownLatch(nodeCount); final AtomicInteger count1 = new AtomicInteger(0); final AtomicInteger count2 = new AtomicInteger(0); final AtomicInteger count3 = new AtomicInteger(0); for (int i = 0; i < nodeCount; i++) { final HazelcastInstance instance = instances[i]; - new Thread(new Runnable() { - public void run() { - ITopic topic = instance.getTopic(randomName); - topic.addMessageListener(new MessageListener() { - public void onMessage(Message message) { - Member publishingMember = message.getPublishingMember(); - if (publishingMember.equals(instance.getCluster().getLocalMember())) { - count1.incrementAndGet(); - } - if (publishingMember.equals(message.getMessageObject())) { - count2.incrementAndGet(); - } - if (publishingMember.localMember()) { - count3.incrementAndGet(); - } - } - }); - mainLatch.countDown(); + ITopic topic = instance.getTopic(randomName); + topic.addMessageListener(new MessageListener() { + public void onMessage(Message message) { + Member publishingMember = message.getPublishingMember(); + if (publishingMember.equals(instance.getCluster().getLocalMember())) { + count1.incrementAndGet(); + } + + Member messageObject = message.getMessageObject(); + if (publishingMember.equals(messageObject)) { + count2.incrementAndGet(); + } + if (publishingMember.localMember()) { + count3.incrementAndGet(); + } } - }).start(); - } - try { - mainLatch.await(1, TimeUnit.MINUTES); - } catch (InterruptedException e) { - fail(); + }); } for (int i = 0; i < nodeCount; i++) { @@ -238,64 +229,46 @@ public void testTopicGlobalOrder() throws Exception { config.getTopicConfig(randomTopicName).setGlobalOrderingEnabled(true); TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(nodeCount); - final HazelcastInstance[] instances = factory.newInstances(config); + final HazelcastInstance[] nodes = factory.newInstances(config); - final List[] messageLists = new List[nodeCount]; + final List[] messageListPerNode = new List[nodeCount]; for (int i = 0; i < nodeCount; i++) { - messageLists[i] = new CopyOnWriteArrayList(); + messageListPerNode[i] = new CopyOnWriteArrayList(); } - final CountDownLatch startLatch = new CountDownLatch(nodeCount); - final CountDownLatch messageLatch = new CountDownLatch(nodeCount * nodeCount * count); - final CountDownLatch publishLatch = new CountDownLatch(nodeCount * count); - - ExecutorService ex = Executors.newFixedThreadPool(nodeCount); - for (int i = 0; i < nodeCount; i++) { - final int finalI = i; - ex.execute(new Runnable() { - public void run() { - final List messages = messageLists[finalI]; - HazelcastInstance hz = instances[finalI]; - ITopic topic = hz.getTopic(randomTopicName); - topic.addMessageListener(new MessageListener() { - public void onMessage(Message message) { - messages.add(message.getMessageObject()); - messageLatch.countDown(); - } - }); - - startLatch.countDown(); - try { - startLatch.await(1, TimeUnit.MINUTES); - } catch (InterruptedException e) { - e.printStackTrace(); - return; - } - - Member localMember = hz.getCluster().getLocalMember(); - for (int j = 0; j < count; j++) { - topic.publish(new TestMessage(localMember, UUID.randomUUID().toString())); - publishLatch.countDown(); - } + final CountDownLatch messageLatch = new CountDownLatch(nodeCount * count); + // add message listeners + for (int i = 0; i < nodes.length; i++) { + final int nodeIndex = i; + ITopic topic = nodes[i].getTopic(randomTopicName); + topic.addMessageListener(new MessageListener() { + public void onMessage(Message message) { + messageListPerNode[nodeIndex].add(message.getMessageObject()); + messageLatch.countDown(); } }); } - try { - assertTrue(publishLatch.await(2, TimeUnit.MINUTES)); - assertTrue(messageLatch.await(5, TimeUnit.MINUTES)); - - TestMessage[] ref = new TestMessage[messageLists[0].size()]; - messageLists[0].toArray(ref); - for (int i = 1; i < nodeCount; i++) { - TestMessage[] messages = new TestMessage[messageLists[i].size()]; - messageLists[i].toArray(messages); - - assertArrayEquals(ref, messages); + // publish messages + for (HazelcastInstance node : nodes) { + Member localMember = node.getCluster().getLocalMember(); + for (int j = 0; j < count; j++) { + TestMessage message = new TestMessage(localMember, UUID.randomUUID().toString()); + ITopic topic = node.getTopic(randomTopicName); + topic.publish(message); } - } finally { - ex.shutdownNow(); } + + // all messages in nodes messageLists should be equal + assertTrueEventually(new AssertTask() { + @Override + public void run() throws Exception { + int i = 0; + do { + assertEquals(messageListPerNode[i], messageListPerNode[i++]); + } while (i < nodeCount); + } + }); } private static class TestMessage implements DataSerializable { @@ -385,7 +358,7 @@ public void onMessage(Message msg) { }); topic.publish(message); - assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(latch.await(10000, MILLISECONDS)); } @Test @@ -433,7 +406,7 @@ public void onMessage(Message msg) { }); topic.publish(message); - assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(latch.await(10000, MILLISECONDS)); } @Test @@ -670,9 +643,9 @@ public void onMessage(Message message) { boolean passed = false; for (int i = 0; i < nodeCount; i++) { - if (threads[i].size() > 1) { - passed = true; - } + if (threads[i].size() > 1) { + passed = true; + } } assertTrue("All listeners received messages in single thread. Expecting more threads involved", passed); } finally {