From b5c9318d41982a4d38e666d7d09f8cdaab648532 Mon Sep 17 00:00:00 2001 From: shtykh_roman Date: Thu, 26 Jan 2017 16:01:38 +0900 Subject: [PATCH 1/6] IGNITE-4539: RocketMQ data streamer. --- modules/rocketmq/README.txt | 25 +++ modules/rocketmq/pom.xml | 89 +++++++++ .../stream/rocketmq/RocketMQStreamer.java | 151 +++++++++++++++ .../ignite/stream/rocketmq/package-info.java | 21 ++ .../stream/rocketmq/RocketMQStreamerTest.java | 180 ++++++++++++++++++ .../rocketmq/RocketMQStreamerTestSuite.java | 37 ++++ .../ignite/stream/rocketmq/package-info.java | 21 ++ parent/pom.xml | 5 + pom.xml | 1 + 9 files changed, 530 insertions(+) create mode 100644 modules/rocketmq/README.txt create mode 100644 modules/rocketmq/pom.xml create mode 100644 modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java create mode 100644 modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java create mode 100644 modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java create mode 100644 modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java create mode 100644 modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java diff --git a/modules/rocketmq/README.txt b/modules/rocketmq/README.txt new file mode 100644 index 0000000000000..55a117bec4af5 --- /dev/null +++ b/modules/rocketmq/README.txt @@ -0,0 +1,25 @@ +Apache Ignite RocketMQ Streamer Module +-------------------------------------- + +Apache Ignite RocketMQ Streamer module provides streaming from RocketMQ to Ignite cache. + +To use Ignite RocketMQ Streamer module, first import it to your Maven project. + + + ... + + ... + + org.apache.ignite + ignite-rocketmq + ${ignite.version} + + ... + + ... + + +Then, initialize and start it as, for instance, done in RocketMQStreamerTest.java. diff --git a/modules/rocketmq/pom.xml b/modules/rocketmq/pom.xml new file mode 100644 index 0000000000000..6400881d279f6 --- /dev/null +++ b/modules/rocketmq/pom.xml @@ -0,0 +1,89 @@ + + + + + + + 4.0.0 + + + org.apache.ignite + ignite-parent + 1 + ../../parent + + + ignite-rocketmq + 2.0.0-SNAPSHOT + http://ignite.apache.org + + + + org.apache.ignite + ignite-core + ${project.version} + + + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + + + + org.apache.rocketmq + rocketmq-test + ${rocketmq.version} + test + + + + org.apache.rocketmq + rocketmq-test + ${rocketmq.version} + test-jar + test + + + + org.apache.ignite + ignite-spring + ${project.version} + test + + + + org.apache.ignite + ignite-core + ${project.version} + test-jar + test + + + + org.apache.ignite + ignite-log4j + ${project.version} + test + + + + diff --git a/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java new file mode 100644 index 0000000000000..67f1ce56695e7 --- /dev/null +++ b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.rocketmq; + +import java.util.List; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.stream.StreamAdapter; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; + +/** + * Streamer that subscribes to a RocketMQ topic amd feeds messages into {@link IgniteDataStreamer} instance. + */ +public class RocketMQStreamer extends StreamAdapter, K, V> implements MessageListenerConcurrently { + /** Logger. */ + private IgniteLogger log; + + /** RocketMQ consumer. */ + private DefaultMQPushConsumer consumer; + + /** State. */ + private volatile boolean stopped = true; + + /** Topic to subscribe to. */ + private String topic; + + /** Consumer group. */ + private String consumerGrp; + + /** Name server address. */ + private String nameSrvAddr; + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() { + if (!stopped) + throw new IgniteException("Attempted to start an already started RocketMQ streamer"); + + // validate parameters. + A.notNull(getStreamer(), "streamer"); + A.notNull(getIgnite(), "ignite"); + A.notNull(topic, "topic"); + A.notNull(consumerGrp, "consumer group"); + A.notNullOrEmpty(nameSrvAddr, "nameserver address"); + A.ensure(null != getMultipleTupleExtractor(), "Multiple tuple extractor must be configured"); + + log = getIgnite().log(); + + consumer = new DefaultMQPushConsumer(consumerGrp); + + consumer.setNamesrvAddr(nameSrvAddr); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + + try { + consumer.subscribe(topic, "*"); + } + catch (MQClientException e) { + throw new IgniteException("Failed to subscribe to " + topic, e); + } + + consumer.registerMessageListener(this); + + try { + consumer.start(); + } + catch (MQClientException e) { + throw new IgniteException("Failed to start the streamer", e); + } + + stopped = false; + } + + /** + * Stops streamer. + */ + public void stop() { + if (consumer != null) + consumer.shutdown(); + + stopped = true; + } + + /** + * Implements {@link MessageListenerConcurrently#consumeMessage(List, ConsumeConcurrentlyContext)} to receive + * messages. + * + * {@inheritDoc} + */ + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + if (log.isDebugEnabled()) + log.debug("Received " + msgs.size() + " messages"); + + addMessage(msgs); + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + /** + * Sets the topic to subscribe to. + * + * @param topic The topic to subscribe to. + */ + public void setTopic(String topic) { + this.topic = topic; + } + + /** + * Sets the name of the consumer group. + * + * @param consumerGrp Consumer group name. + */ + public void setConsumerGrp(String consumerGrp) { + this.consumerGrp = consumerGrp; + } + + /** + * Sets the name server address. + * + * @param nameSrvAddr Name server address + */ + public void setNameSrvAddr(String nameSrvAddr) { + this.nameSrvAddr = nameSrvAddr; + } +} diff --git a/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java new file mode 100644 index 0000000000000..37e24a7de1e74 --- /dev/null +++ b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains implementation of RocketMQStreamer. + */ +package org.apache.ignite.stream.rocketmq; \ No newline at end of file diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java new file mode 100644 index 0000000000000..4717bde883eab --- /dev/null +++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.rocketmq; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.stream.StreamMultipleTupleExtractor; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.apache.rocketmq.test.base.IntegrationTestBase; + +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; + +/** + * Test for {@link RocketMQStreamer}. + */ +public class RocketMQStreamerTest extends GridCommonAbstractTest { + /** Test topic. */ + private static final String TOPIC_NAME = "testTopic"; + + /** Test consumer group. */ + private static final String CONSUMER_GRP = "testConsumerGrp"; + + /** Test name server address. */ + private static final String NAMESRV_ADDR = "127.0.0.1:"; + + private static NamesrvController nameSrv; + + private static BrokerController broker; + + /** Number of events to handle. */ + private static final int EVT_NUM = 1000; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void beforeTest() throws Exception { + grid().getOrCreateCache(defaultCacheConfiguration()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + grid().cache(null).clear(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void beforeTestsStarted() throws Exception { + nameSrv = IntegrationTestBase.createAndStartNamesrv(); + + broker = IntegrationTestBase.createAndStartBroker( + NAMESRV_ADDR + nameSrv.getNettyServerConfig().getListenPort()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + broker.shutdown(); + + nameSrv.shutdown(); + } + + /** Constructor. */ + public RocketMQStreamerTest() { + super(true); + } + + public void testStreamer() throws Exception { + RocketMQStreamer streamer = null; + + Ignite ignite = grid(); + + try (IgniteDataStreamer dataStreamer = ignite.dataStreamer(null)) { + dataStreamer.allowOverwrite(true); + dataStreamer.autoFlushFrequency(10); + + streamer = new RocketMQStreamer<>(); + + //configure. + streamer.setIgnite(ignite); + streamer.setStreamer(dataStreamer); + streamer.setNameSrvAddr(NAMESRV_ADDR + nameSrv.getNettyServerConfig().getListenPort()); + streamer.setConsumerGrp(CONSUMER_GRP); + streamer.setTopic(TOPIC_NAME); + streamer.setMultipleTupleExtractor(new TestTupleExtractor()); + + streamer.start(); + + IgniteCache cache = ignite.cache(null); + + assertEquals(0, cache.size(CachePeekMode.PRIMARY)); + + final CountDownLatch latch = new CountDownLatch(EVT_NUM); + + IgniteBiPredicate putLsnr = new IgniteBiPredicate() { + @Override public boolean apply(UUID uuid, CacheEvent evt) { + assert evt != null; + + latch.countDown(); + + return true; + } + }; + + ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(putLsnr, null, EVT_CACHE_OBJECT_PUT); + + produceData(); + + assertTrue(latch.await(30, TimeUnit.SECONDS)); + + assertEquals(EVT_NUM, cache.size(CachePeekMode.PRIMARY)); + } + finally { + if (streamer != null) + streamer.stop(); + } + } + + public static class TestTupleExtractor implements StreamMultipleTupleExtractor, String, byte[]> { + + @Override public Map extract(List msgs) { + final Map map = new HashMap<>(); + + for (MessageExt msg : msgs) { + map.put(msg.getMsgId(), msg.getBody()); + } + + return map; + } + } + + private void produceData() throws Exception { + IntegrationTestBase.initTopic( + TOPIC_NAME, NAMESRV_ADDR + nameSrv.getNettyServerConfig().getListenPort(), + broker.getBrokerConfig().getBrokerClusterName()); + + DefaultMQProducer producer = new DefaultMQProducer("testProducerGrp"); + producer.setNamesrvAddr(NAMESRV_ADDR + nameSrv.getNettyServerConfig().getListenPort()); + + try { + producer.start(); + + for (int i = 0; i < EVT_NUM; i++) + producer.send(new Message(TOPIC_NAME, "", String.valueOf(i).getBytes("UTF-8"))); + } + catch (Exception e) { + throw new Exception(e); + } + finally { + producer.shutdown(); + } + } +} diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java new file mode 100644 index 0000000000000..e761f1b879756 --- /dev/null +++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.rocketmq; + +import junit.framework.TestSuite; + +/** + * Apache RocketMQ streamers tests. + */ +public class RocketMQStreamerTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Apache Kafka streamer Test Suite"); + + suite.addTest(new TestSuite(RocketMQStreamerTest.class)); + + return suite; + } +} diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java new file mode 100644 index 0000000000000..37e24a7de1e74 --- /dev/null +++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains implementation of RocketMQStreamer. + */ +package org.apache.ignite.stream.rocketmq; \ No newline at end of file diff --git a/parent/pom.xml b/parent/pom.xml index 2cb88b001fb69..dd1ab8d7cec15 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -94,6 +94,7 @@ 5.0.0 5.0.0 1.0.2 + 4.0.0-SNAPSHOT 2.10.4 2.10.4 2.11.7 @@ -447,6 +448,10 @@ Flink integration org.apache.ignite.sink.flink* + + RocketMQ integration + org.apache.ignite.rocketmq* +
modules/web/ignite-websphere-test modules/cassandra modules/flink + modules/rocketmq From dc2eed604d6a90cd60930f058d90a8bf97017f7a Mon Sep 17 00:00:00 2001 From: shtykh_roman Date: Tue, 21 Feb 2017 19:10:24 +0900 Subject: [PATCH 2/6] Changed RocketMQ version to the latest release. --- .../java/org/apache/ignite/stream/rocketmq/package-info.java | 2 +- .../java/org/apache/ignite/stream/rocketmq/package-info.java | 2 +- parent/pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java index 37e24a7de1e74..eebf084857408 100644 --- a/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java +++ b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java @@ -18,4 +18,4 @@ /** * Contains implementation of RocketMQStreamer. */ -package org.apache.ignite.stream.rocketmq; \ No newline at end of file +package org.apache.ignite.stream.rocketmq; diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java index 37e24a7de1e74..eebf084857408 100644 --- a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java +++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java @@ -18,4 +18,4 @@ /** * Contains implementation of RocketMQStreamer. */ -package org.apache.ignite.stream.rocketmq; \ No newline at end of file +package org.apache.ignite.stream.rocketmq; diff --git a/parent/pom.xml b/parent/pom.xml index dd1ab8d7cec15..9a6656caf47b5 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -94,7 +94,7 @@ 5.0.0 5.0.0 1.0.2 - 4.0.0-SNAPSHOT + 4.0.0-incubating 2.10.4 2.10.4 2.11.7 From b8a69abb156312f79110e5a2504dfa1489b64579 Mon Sep 17 00:00:00 2001 From: shtykh_roman Date: Thu, 23 Feb 2017 18:55:41 +0900 Subject: [PATCH 3/6] IGNITE-4539: reworked RocketMQ startup. --- .../ignite/stream/rocketmq/package-info.java | 2 +- .../stream/rocketmq/RocketMQStreamerTest.java | 70 ++++++--- .../stream/rocketmq/TestRocketMQServer.java | 135 ++++++++++++++++++ 3 files changed, 184 insertions(+), 23 deletions(-) create mode 100644 modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java diff --git a/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java index eebf084857408..f743696af09cb 100644 --- a/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java +++ b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java @@ -16,6 +16,6 @@ */ /** - * Contains implementation of RocketMQStreamer. + * Contains implementation of RocketMQStreamer tests. */ package org.apache.ignite.stream.rocketmq; diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java index 4717bde883eab..337dc2806d7c9 100644 --- a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java +++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java @@ -28,17 +28,20 @@ import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.stream.StreamMultipleTupleExtractor; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.namesrv.NamesrvController; -import org.apache.rocketmq.test.base.IntegrationTestBase; +import org.apache.rocketmq.test.util.MQAdmin; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; +import static org.apache.ignite.stream.rocketmq.TestRocketMQServer.NAME_SERVER_PORT; +import static org.apache.ignite.stream.rocketmq.TestRocketMQServer.TEST_IP; /** * Test for {@link RocketMQStreamer}. @@ -50,12 +53,11 @@ public class RocketMQStreamerTest extends GridCommonAbstractTest { /** Test consumer group. */ private static final String CONSUMER_GRP = "testConsumerGrp"; - /** Test name server address. */ - private static final String NAMESRV_ADDR = "127.0.0.1:"; + /** Test cluster name. */ + private static final String TEST_CLUSTER = "testCluster"; - private static NamesrvController nameSrv; - - private static BrokerController broker; + /** Test server. */ + private static TestRocketMQServer testRocketMQServer; /** Number of events to handle. */ private static final int EVT_NUM = 1000; @@ -74,17 +76,13 @@ public class RocketMQStreamerTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected void beforeTestsStarted() throws Exception { - nameSrv = IntegrationTestBase.createAndStartNamesrv(); - - broker = IntegrationTestBase.createAndStartBroker( - NAMESRV_ADDR + nameSrv.getNettyServerConfig().getListenPort()); + testRocketMQServer = new TestRocketMQServer(log); } /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { - broker.shutdown(); - - nameSrv.shutdown(); + if (testRocketMQServer != null) + testRocketMQServer.shutdown(); } /** Constructor. */ @@ -92,6 +90,11 @@ public RocketMQStreamerTest() { super(true); } + /** + * Tests data is properly injected into the grid. + * + * @throws Exception If fails. + */ public void testStreamer() throws Exception { RocketMQStreamer streamer = null; @@ -106,7 +109,7 @@ public void testStreamer() throws Exception { //configure. streamer.setIgnite(ignite); streamer.setStreamer(dataStreamer); - streamer.setNameSrvAddr(NAMESRV_ADDR + nameSrv.getNettyServerConfig().getListenPort()); + streamer.setNameSrvAddr(TEST_IP + ":" + NAME_SERVER_PORT); streamer.setConsumerGrp(CONSUMER_GRP); streamer.setTopic(TOPIC_NAME); streamer.setMultipleTupleExtractor(new TestTupleExtractor()); @@ -143,26 +146,33 @@ public void testStreamer() throws Exception { } } + /** + * Test tuple extractor. + */ public static class TestTupleExtractor implements StreamMultipleTupleExtractor, String, byte[]> { + /** {@inheritDoc} */ @Override public Map extract(List msgs) { final Map map = new HashMap<>(); - for (MessageExt msg : msgs) { + for (MessageExt msg : msgs) map.put(msg.getMsgId(), msg.getBody()); - } return map; } } + /** + * Adds data to RocketMQ. + * + * @throws Exception If fails. + */ private void produceData() throws Exception { - IntegrationTestBase.initTopic( - TOPIC_NAME, NAMESRV_ADDR + nameSrv.getNettyServerConfig().getListenPort(), - broker.getBrokerConfig().getBrokerClusterName()); + initTopic(TOPIC_NAME, TEST_IP + ":" + NAME_SERVER_PORT, TEST_CLUSTER); DefaultMQProducer producer = new DefaultMQProducer("testProducerGrp"); - producer.setNamesrvAddr(NAMESRV_ADDR + nameSrv.getNettyServerConfig().getListenPort()); + + producer.setNamesrvAddr(TEST_IP + ":" + NAME_SERVER_PORT); try { producer.start(); @@ -177,4 +187,20 @@ private void produceData() throws Exception { producer.shutdown(); } } + + /** + * Initializes RocketMQ topic. + * + * @param topic Topic. + * @param nsAddr Nameserver address. + * @param clusterName Cluster name. + * @throws IgniteInterruptedCheckedException If fails. + */ + private void initTopic(String topic, String nsAddr, String clusterName) throws IgniteInterruptedCheckedException { + assertFalse(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return MQAdmin.createTopic(nsAddr, clusterName, topic, 8, 10); + } + }, 10_000)); + } } diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java new file mode 100644 index 0000000000000..43575a809b77c --- /dev/null +++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.rocketmq; + +import java.util.UUID; +import org.apache.ignite.IgniteLogger; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.namesrv.NamesrvConfig; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.store.config.MessageStoreConfig; + +import static java.io.File.separator; + +/** + * Test RocketMQ server handling a broker and a nameserver. + */ +class TestRocketMQServer { + /** Nameserver port. */ + protected static final int NAME_SERVER_PORT = 9000; + + /** Broker port. */ + private static final int BROKER_PORT = 8000; + + /** Broker HA port. */ + private static final int HA_PORT = 8001; + + /** Test ip address. */ + protected static final String TEST_IP = "127.0.0.1"; + + /** Test broker name. */ + private static final String TEST_BROKER = "testBroker"; + + /** Nameserver. */ + private static NamesrvController nameSrv; + + /** Broker. */ + private static BrokerController broker; + + /** Logger. */ + private final IgniteLogger log; + + /** + * Test server constructor. + * + * @param log Logger. + */ + TestRocketMQServer(IgniteLogger log) { + this.log = log; + + try { + startNameServer(); + startBroker(); + } + catch (Exception e) { + throw new RuntimeException("Failed to start RocketMQ: " + e); + } + } + + /** + * Starts a test nameserver. + * + * @throws Exception If fails. + */ + private void startNameServer() throws Exception { + NamesrvConfig namesrvConfig = new NamesrvConfig(); + NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig(); + + namesrvConfig.setKvConfigPath(System.getProperty("java.io.tmpdir") + separator + "namesrv" + separator + "kvConfig.json"); + nameServerNettyServerConfig.setListenPort(NAME_SERVER_PORT); + + nameSrv = new NamesrvController(namesrvConfig, nameServerNettyServerConfig); + + nameSrv.initialize(); + nameSrv.start(); + + log.info("Started nameserver at " + NAME_SERVER_PORT); + } + + /** + * Starts a test broker. + * + * @throws Exception If fails. + */ + private void startBroker() throws Exception { + BrokerConfig brokerCfg = new BrokerConfig(); + NettyServerConfig nettySrvCfg = new NettyServerConfig(); + MessageStoreConfig storeCfg = new MessageStoreConfig(); + + brokerCfg.setBrokerName(TEST_BROKER); + brokerCfg.setBrokerIP1(TEST_IP); + brokerCfg.setNamesrvAddr(TEST_IP + ":" + NAME_SERVER_PORT); + + storeCfg.setStorePathRootDir(System.getProperty("java.io.tmpdir") + separator + "store-" + UUID.randomUUID()); + storeCfg.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + separator + "commitlog"); + storeCfg.setHaListenPort(HA_PORT); + + nettySrvCfg.setListenPort(BROKER_PORT); + + broker = new BrokerController(brokerCfg, nettySrvCfg, new NettyClientConfig(), storeCfg); + + broker.initialize(); + broker.start(); + + log.info("Started broker [" + TEST_BROKER + "] at " + BROKER_PORT); + } + + /** + * Shuts test server down. + */ + void shutdown() { + if (broker != null) + broker.shutdown(); + + if (nameSrv != null) + nameSrv.shutdown(); + } +} From 0adfd27e5d4dcf77814128180cb40a645855078f Mon Sep 17 00:00:00 2001 From: shtykh_roman Date: Wed, 19 Apr 2017 14:32:11 +0900 Subject: [PATCH 4/6] IGNITE-4539: Reworked topic creation. --- .../stream/rocketmq/RocketMQStreamerTest.java | 36 +++++++++++-------- .../stream/rocketmq/TestRocketMQServer.java | 13 +++++++ 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java index 337dc2806d7c9..59451e9fe3a21 100644 --- a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java +++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java @@ -29,15 +29,15 @@ import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.events.CacheEvent; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.stream.StreamMultipleTupleExtractor; -import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.test.util.MQAdmin; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; import static org.apache.ignite.stream.rocketmq.TestRocketMQServer.NAME_SERVER_PORT; @@ -53,9 +53,6 @@ public class RocketMQStreamerTest extends GridCommonAbstractTest { /** Test consumer group. */ private static final String CONSUMER_GRP = "testConsumerGrp"; - /** Test cluster name. */ - private static final String TEST_CLUSTER = "testCluster"; - /** Test server. */ private static TestRocketMQServer testRocketMQServer; @@ -168,7 +165,7 @@ public static class TestTupleExtractor implements StreamMultipleTupleExtractor Date: Wed, 19 Apr 2017 23:00:12 +0900 Subject: [PATCH 5/6] Updated RocketMQ dependencies. --- modules/rocketmq/pom.xml | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/modules/rocketmq/pom.xml b/modules/rocketmq/pom.xml index 6400881d279f6..7906a273abcd9 100644 --- a/modules/rocketmq/pom.xml +++ b/modules/rocketmq/pom.xml @@ -44,23 +44,14 @@ org.apache.rocketmq - rocketmq-client + rocketmq-broker ${rocketmq.version} org.apache.rocketmq - rocketmq-test + rocketmq-namesrv ${rocketmq.version} - test - - - - org.apache.rocketmq - rocketmq-test - ${rocketmq.version} - test-jar - test From bdf182021d1409cbc017d8f2d6a1d69b9fccbba2 Mon Sep 17 00:00:00 2001 From: shtykh_roman Date: Tue, 25 Apr 2017 18:06:02 +0900 Subject: [PATCH 6/6] Broker scope to test. --- modules/rocketmq/pom.xml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/modules/rocketmq/pom.xml b/modules/rocketmq/pom.xml index 7906a273abcd9..3b317fa7be87f 100644 --- a/modules/rocketmq/pom.xml +++ b/modules/rocketmq/pom.xml @@ -44,14 +44,15 @@ org.apache.rocketmq - rocketmq-broker + rocketmq-namesrv ${rocketmq.version} org.apache.rocketmq - rocketmq-namesrv + rocketmq-broker ${rocketmq.version} + test