diff --git a/modules/rocketmq/README.txt b/modules/rocketmq/README.txt new file mode 100644 index 0000000000000..55a117bec4af5 --- /dev/null +++ b/modules/rocketmq/README.txt @@ -0,0 +1,25 @@ +Apache Ignite RocketMQ Streamer Module +-------------------------------------- + +Apache Ignite RocketMQ Streamer module provides streaming from RocketMQ to Ignite cache. + +To use Ignite RocketMQ Streamer module, first import it to your Maven project. + + + ... + + ... + + org.apache.ignite + ignite-rocketmq + ${ignite.version} + + ... + + ... + + +Then, initialize and start it as, for instance, done in RocketMQStreamerTest.java. diff --git a/modules/rocketmq/pom.xml b/modules/rocketmq/pom.xml new file mode 100644 index 0000000000000..3b317fa7be87f --- /dev/null +++ b/modules/rocketmq/pom.xml @@ -0,0 +1,81 @@ + + + + + + + 4.0.0 + + + org.apache.ignite + ignite-parent + 1 + ../../parent + + + ignite-rocketmq + 2.0.0-SNAPSHOT + http://ignite.apache.org + + + + org.apache.ignite + ignite-core + ${project.version} + + + + org.apache.rocketmq + rocketmq-namesrv + ${rocketmq.version} + + + + org.apache.rocketmq + rocketmq-broker + ${rocketmq.version} + test + + + + org.apache.ignite + ignite-spring + ${project.version} + test + + + + org.apache.ignite + ignite-core + ${project.version} + test-jar + test + + + + org.apache.ignite + ignite-log4j + ${project.version} + test + + + + diff --git a/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java new file mode 100644 index 0000000000000..67f1ce56695e7 --- /dev/null +++ b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.rocketmq; + +import java.util.List; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.stream.StreamAdapter; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; + +/** + * Streamer that subscribes to a RocketMQ topic amd feeds messages into {@link IgniteDataStreamer} instance. + */ +public class RocketMQStreamer extends StreamAdapter, K, V> implements MessageListenerConcurrently { + /** Logger. */ + private IgniteLogger log; + + /** RocketMQ consumer. */ + private DefaultMQPushConsumer consumer; + + /** State. */ + private volatile boolean stopped = true; + + /** Topic to subscribe to. */ + private String topic; + + /** Consumer group. */ + private String consumerGrp; + + /** Name server address. */ + private String nameSrvAddr; + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() { + if (!stopped) + throw new IgniteException("Attempted to start an already started RocketMQ streamer"); + + // validate parameters. + A.notNull(getStreamer(), "streamer"); + A.notNull(getIgnite(), "ignite"); + A.notNull(topic, "topic"); + A.notNull(consumerGrp, "consumer group"); + A.notNullOrEmpty(nameSrvAddr, "nameserver address"); + A.ensure(null != getMultipleTupleExtractor(), "Multiple tuple extractor must be configured"); + + log = getIgnite().log(); + + consumer = new DefaultMQPushConsumer(consumerGrp); + + consumer.setNamesrvAddr(nameSrvAddr); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + + try { + consumer.subscribe(topic, "*"); + } + catch (MQClientException e) { + throw new IgniteException("Failed to subscribe to " + topic, e); + } + + consumer.registerMessageListener(this); + + try { + consumer.start(); + } + catch (MQClientException e) { + throw new IgniteException("Failed to start the streamer", e); + } + + stopped = false; + } + + /** + * Stops streamer. + */ + public void stop() { + if (consumer != null) + consumer.shutdown(); + + stopped = true; + } + + /** + * Implements {@link MessageListenerConcurrently#consumeMessage(List, ConsumeConcurrentlyContext)} to receive + * messages. + * + * {@inheritDoc} + */ + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + if (log.isDebugEnabled()) + log.debug("Received " + msgs.size() + " messages"); + + addMessage(msgs); + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + /** + * Sets the topic to subscribe to. + * + * @param topic The topic to subscribe to. + */ + public void setTopic(String topic) { + this.topic = topic; + } + + /** + * Sets the name of the consumer group. + * + * @param consumerGrp Consumer group name. + */ + public void setConsumerGrp(String consumerGrp) { + this.consumerGrp = consumerGrp; + } + + /** + * Sets the name server address. + * + * @param nameSrvAddr Name server address + */ + public void setNameSrvAddr(String nameSrvAddr) { + this.nameSrvAddr = nameSrvAddr; + } +} diff --git a/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java new file mode 100644 index 0000000000000..f743696af09cb --- /dev/null +++ b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains implementation of RocketMQStreamer tests. + */ +package org.apache.ignite.stream.rocketmq; diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java new file mode 100644 index 0000000000000..59451e9fe3a21 --- /dev/null +++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.rocketmq; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.stream.StreamMultipleTupleExtractor; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; + +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; +import static org.apache.ignite.stream.rocketmq.TestRocketMQServer.NAME_SERVER_PORT; +import static org.apache.ignite.stream.rocketmq.TestRocketMQServer.TEST_IP; + +/** + * Test for {@link RocketMQStreamer}. + */ +public class RocketMQStreamerTest extends GridCommonAbstractTest { + /** Test topic. */ + private static final String TOPIC_NAME = "testTopic"; + + /** Test consumer group. */ + private static final String CONSUMER_GRP = "testConsumerGrp"; + + /** Test server. */ + private static TestRocketMQServer testRocketMQServer; + + /** Number of events to handle. */ + private static final int EVT_NUM = 1000; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void beforeTest() throws Exception { + grid().getOrCreateCache(defaultCacheConfiguration()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + grid().cache(null).clear(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void beforeTestsStarted() throws Exception { + testRocketMQServer = new TestRocketMQServer(log); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + if (testRocketMQServer != null) + testRocketMQServer.shutdown(); + } + + /** Constructor. */ + public RocketMQStreamerTest() { + super(true); + } + + /** + * Tests data is properly injected into the grid. + * + * @throws Exception If fails. + */ + public void testStreamer() throws Exception { + RocketMQStreamer streamer = null; + + Ignite ignite = grid(); + + try (IgniteDataStreamer dataStreamer = ignite.dataStreamer(null)) { + dataStreamer.allowOverwrite(true); + dataStreamer.autoFlushFrequency(10); + + streamer = new RocketMQStreamer<>(); + + //configure. + streamer.setIgnite(ignite); + streamer.setStreamer(dataStreamer); + streamer.setNameSrvAddr(TEST_IP + ":" + NAME_SERVER_PORT); + streamer.setConsumerGrp(CONSUMER_GRP); + streamer.setTopic(TOPIC_NAME); + streamer.setMultipleTupleExtractor(new TestTupleExtractor()); + + streamer.start(); + + IgniteCache cache = ignite.cache(null); + + assertEquals(0, cache.size(CachePeekMode.PRIMARY)); + + final CountDownLatch latch = new CountDownLatch(EVT_NUM); + + IgniteBiPredicate putLsnr = new IgniteBiPredicate() { + @Override public boolean apply(UUID uuid, CacheEvent evt) { + assert evt != null; + + latch.countDown(); + + return true; + } + }; + + ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(putLsnr, null, EVT_CACHE_OBJECT_PUT); + + produceData(); + + assertTrue(latch.await(30, TimeUnit.SECONDS)); + + assertEquals(EVT_NUM, cache.size(CachePeekMode.PRIMARY)); + } + finally { + if (streamer != null) + streamer.stop(); + } + } + + /** + * Test tuple extractor. + */ + public static class TestTupleExtractor implements StreamMultipleTupleExtractor, String, byte[]> { + + /** {@inheritDoc} */ + @Override public Map extract(List msgs) { + final Map map = new HashMap<>(); + + for (MessageExt msg : msgs) + map.put(msg.getMsgId(), msg.getBody()); + + return map; + } + } + + /** + * Adds data to RocketMQ. + * + * @throws Exception If fails. + */ + private void produceData() throws Exception { + initTopic(TOPIC_NAME, TEST_IP + ":" + NAME_SERVER_PORT); + + DefaultMQProducer producer = new DefaultMQProducer("testProducerGrp"); + + producer.setNamesrvAddr(TEST_IP + ":" + NAME_SERVER_PORT); + + try { + producer.start(); + + for (int i = 0; i < EVT_NUM; i++) + producer.send(new Message(TOPIC_NAME, "", String.valueOf(i).getBytes("UTF-8"))); + } + catch (Exception e) { + throw new Exception(e); + } + finally { + producer.shutdown(); + } + } + + /** + * Initializes RocketMQ topic. + * + * @param topic Topic. + * @param nsAddr Nameserver address. + * @throws IgniteInterruptedCheckedException If fails. + */ + private void initTopic(String topic, String nsAddr) throws Exception { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExt.setNamesrvAddr(nsAddr); + try { + defaultMQAdminExt.start(); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topic); + topicConfig.setReadQueueNums(4); + topicConfig.setWriteQueueNums(4); + + defaultMQAdminExt.createAndUpdateTopicConfig(testRocketMQServer.getBrokerAddr(), topicConfig); + + U.sleep(100); + } + finally { + defaultMQAdminExt.shutdown(); + } + } +} diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java new file mode 100644 index 0000000000000..e761f1b879756 --- /dev/null +++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.rocketmq; + +import junit.framework.TestSuite; + +/** + * Apache RocketMQ streamers tests. + */ +public class RocketMQStreamerTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Apache Kafka streamer Test Suite"); + + suite.addTest(new TestSuite(RocketMQStreamerTest.class)); + + return suite; + } +} diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java new file mode 100644 index 0000000000000..beece8ef19fc9 --- /dev/null +++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.rocketmq; + +import java.util.UUID; +import org.apache.ignite.IgniteLogger; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.namesrv.NamesrvConfig; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.store.config.MessageStoreConfig; + +import static java.io.File.separator; + +/** + * Test RocketMQ server handling a broker and a nameserver. + */ +class TestRocketMQServer { + /** Nameserver port. */ + protected static final int NAME_SERVER_PORT = 9000; + + /** Broker port. */ + private static final int BROKER_PORT = 8000; + + /** Broker HA port. */ + private static final int HA_PORT = 8001; + + /** Test ip address. */ + protected static final String TEST_IP = "127.0.0.1"; + + /** Test broker name. */ + private static final String TEST_BROKER = "testBroker"; + + /** Test cluster name. */ + private static final String TEST_CLUSTER = "testCluster"; + + /** Nameserver. */ + private static NamesrvController nameSrv; + + /** Broker. */ + private static BrokerController broker; + + /** Logger. */ + private final IgniteLogger log; + + /** + * Test server constructor. + * + * @param log Logger. + */ + TestRocketMQServer(IgniteLogger log) { + this.log = log; + + try { + startNameServer(); + startBroker(); + } + catch (Exception e) { + throw new RuntimeException("Failed to start RocketMQ: " + e); + } + } + + /** + * Starts a test nameserver. + * + * @throws Exception If fails. + */ + private void startNameServer() throws Exception { + NamesrvConfig namesrvConfig = new NamesrvConfig(); + NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig(); + + namesrvConfig.setKvConfigPath(System.getProperty("java.io.tmpdir") + separator + "namesrv" + separator + "kvConfig.json"); + nameServerNettyServerConfig.setListenPort(NAME_SERVER_PORT); + + nameSrv = new NamesrvController(namesrvConfig, nameServerNettyServerConfig); + + nameSrv.initialize(); + nameSrv.start(); + + log.info("Started nameserver at " + NAME_SERVER_PORT); + } + + /** + * Starts a test broker. + * + * @throws Exception If fails. + */ + private void startBroker() throws Exception { + BrokerConfig brokerCfg = new BrokerConfig(); + NettyServerConfig nettySrvCfg = new NettyServerConfig(); + MessageStoreConfig storeCfg = new MessageStoreConfig(); + + brokerCfg.setBrokerName(TEST_BROKER); + brokerCfg.setBrokerClusterName(TEST_CLUSTER); + brokerCfg.setBrokerIP1(TEST_IP); + brokerCfg.setNamesrvAddr(TEST_IP + ":" + NAME_SERVER_PORT); + + storeCfg.setStorePathRootDir(System.getProperty("java.io.tmpdir") + separator + "store-" + UUID.randomUUID()); + storeCfg.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + separator + "commitlog"); + storeCfg.setHaListenPort(HA_PORT); + + nettySrvCfg.setListenPort(BROKER_PORT); + + broker = new BrokerController(brokerCfg, nettySrvCfg, new NettyClientConfig(), storeCfg); + + broker.initialize(); + broker.start(); + + log.info("Started broker [" + TEST_BROKER + "] at " + BROKER_PORT); + } + + /** + * Obtains the broker address. + * + * @return Broker address. + */ + String getBrokerAddr() { + return broker.getBrokerAddr(); + } + + /** + * Shuts test server down. + */ + void shutdown() { + if (broker != null) + broker.shutdown(); + + if (nameSrv != null) + nameSrv.shutdown(); + } +} diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java new file mode 100644 index 0000000000000..eebf084857408 --- /dev/null +++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains implementation of RocketMQStreamer. + */ +package org.apache.ignite.stream.rocketmq; diff --git a/parent/pom.xml b/parent/pom.xml index adb7995bf8d81..9891e10628920 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -94,6 +94,7 @@ 5.0.0 5.0.0 1.0.2 + 4.0.0-incubating 2.10.4 2.10.4 2.11.7 @@ -449,6 +450,10 @@ SpringData integration org.apache.ignite.springdata.repository* + + RocketMQ integration + org.apache.ignite.rocketmq* +
modules/flink modules/kubernetes modules/zeromq + modules/rocketmq