From f95b989438d863259192d85946ca02b7bcee73ac Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Sat, 27 Jan 2018 19:03:48 +0800 Subject: [PATCH 1/4] ROCKETMQ-82: RocketMQ-Flink Integration --- rocketmq-flink/README.md | 147 ++++++++ rocketmq-flink/pom.xml | 182 ++++++++++ .../apache/rocketmq/flink/RocketMQSink.java | 189 ++++++++++ .../apache/rocketmq/flink/RocketMQSource.java | 322 ++++++++++++++++++ .../apache/rocketmq/flink/RocketMqConfig.java | 134 ++++++++ .../apache/rocketmq/flink/RocketMqUtils.java | 36 ++ .../common/selector/DefaultTopicSelector.java | 43 +++ .../common/selector/SimpleTopicSelector.java | 73 ++++ .../flink/common/selector/TopicSelector.java | 28 ++ .../KeyValueDeserializationSchema.java | 27 ++ .../KeyValueSerializationSchema.java | 28 ++ .../SimpleKeyValueDeserializationSchema.java | 66 ++++ .../SimpleKeyValueSerializationSchema.java | 63 ++++ .../apache/rocketmq/flink/ConsumerTest.java | 54 +++ .../apache/rocketmq/flink/ProducerTest.java | 57 ++++ .../rocketmq/flink/RocketMQFlinkExample.java | 73 ++++ rocketmq-flink/style/copyright/Apache.xml | 24 ++ .../style/copyright/profiles_settings.xml | 64 ++++ rocketmq-flink/style/rmq_checkstyle.xml | 135 ++++++++ rocketmq-flink/style/rmq_codeStyle.xml | 157 +++++++++ 20 files changed, 1902 insertions(+) create mode 100644 rocketmq-flink/README.md create mode 100644 rocketmq-flink/pom.xml create mode 100644 rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java create mode 100644 rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java create mode 100644 rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqConfig.java create mode 100644 rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqUtils.java create mode 100644 rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java create mode 100644 rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java create mode 100644 rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java create mode 100644 rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java create mode 100644 rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java create mode 100644 rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java create mode 100644 rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java create mode 100644 rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ConsumerTest.java create mode 100644 rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ProducerTest.java create mode 100644 rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQFlinkExample.java create mode 100644 rocketmq-flink/style/copyright/Apache.xml create mode 100644 rocketmq-flink/style/copyright/profiles_settings.xml create mode 100644 rocketmq-flink/style/rmq_checkstyle.xml create mode 100644 rocketmq-flink/style/rmq_codeStyle.xml diff --git a/rocketmq-flink/README.md b/rocketmq-flink/README.md new file mode 100644 index 000000000..629f2c934 --- /dev/null +++ b/rocketmq-flink/README.md @@ -0,0 +1,147 @@ +# RocketMQ-Flink + +RocketMQ integration for [Apache Flink](https://flink.apache.org/). This module includes the RocketMQ source and sink that allows a flink job to either write messages into a topic or read from topics in a flink job. + + +## RocketMQSource +To use the `RocketMQSource`, you construct an instance of it by specifying a KeyValueDeserializationSchema instance and a Properties instance which including rocketmq configs. +`RocketMQSource(KeyValueDeserializationSchema schema, Properties props)` +The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability guarantees when checkpoints are enabled. +Otherwise, the source doesn't provide any reliability guarantees. + +### KeyValueDeserializationSchema +The main API for deserializing topic and tags is the `org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema` interface. +`rocketmq-flink` includes general purpose `KeyValueDeserializationSchema` implementations called `SimpleKeyValueDeserializationSchema`. + +```java +public interface KeyValueDeserializationSchema extends ResultTypeQueryable, Serializable { + T deserializeKeyAndValue(byte[] key, byte[] value); +} +``` + +## RocketMQSink +To use the `RocketMQSink`, you construct an instance of it by specifying KeyValueSerializationSchema & TopicSelector instances and a Properties instance which including rocketmq configs. +`RocketMQSink(KeyValueSerializationSchema schema, TopicSelector topicSelector, Properties props)` +The RocketMQSink provides at-least-once reliability guarantees when checkpoints are enabled and `withBatchFlushOnCheckpoint(true)` is set. +Otherwise, the sink reliability guarantees depends on rocketmq producer's retry policy, for this case, the messages sending way is sync by default, +but you can change it by invoking `withAsync(true)`. + +### KeyValueSerializationSchema +The main API for serializing topic and tags is the `org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema` interface. +`rocketmq-flink` includes general purpose `KeyValueSerializationSchema` implementations called `SimpleKeyValueSerializationSchema`. + +```java +public interface KeyValueSerializationSchema extends Serializable { + + byte[] serializeKey(T tuple); + + byte[] serializeValue(T tuple); +} +``` + +### TopicSelector +The main API for selecting topic and tags is the `org.apache.rocketmq.flink.common.selector.TopicSelector` interface. +`rocketmq-flink` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `SimpleTopicSelector`. + +```java +public interface TopicSelector extends Serializable { + + String getTopic(T tuple); + + String getTag(T tuple); +} +``` + +## Examples +The following is an example which receive messages from RocketMQ brokers and send messages to broker after processing. + + ```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // enable checkpoint + env.enableCheckpointing(3000); + + Properties consumerProps = new Properties(); + consumerProps.setProperty(RocketMqConfig.NAME_SERVER_ADDR, "localhost:9876"); + consumerProps.setProperty(RocketMqConfig.CONSUMER_GROUP, "c002"); + consumerProps.setProperty(RocketMqConfig.CONSUMER_TOPIC, "flink-source2"); + + Properties producerProps = new Properties(); + producerProps.setProperty(RocketMqConfig.NAME_SERVER_ADDR, "localhost:9876"); + + env.addSource(new RocketMQSource(new SimpleKeyValueDeserializationSchema("id", "address"), consumerProps)) + .name("rocketmq-source") + .setParallelism(2) + .process(new ProcessFunction() { + @Override + public void processElement(Map in, Context ctx, Collector out) throws Exception { + HashMap result = new HashMap(); + result.put("id", in.get("id")); + String[] arr = in.get("address").toString().split("\\s+"); + result.put("province", arr[arr.length-1]); + out.collect(result); + } + }) + .name("upper-processor") + .setParallelism(2) + .addSink(new RocketMQSink(new SimpleKeyValueSerializationSchema("id", "province"), + new DefaultTopicSelector("flink-sink2"), producerProps).withBatchFlushOnCheckpoint(true)) + .name("rocketmq-sink") + .setParallelism(2); + + try { + env.execute("rocketmq-flink-example"); + } catch (Exception e) { + e.printStackTrace(); + } + ``` + +## Configurations +The following configurations are all from the class `org.apache.rocketmq.flink.RocketMqConfig`. + +### Producer Configurations +| NAME | DESCRIPTION | DEFAULT | +| ------------- |:-------------:|:------:| +| nameserver.address | name server address *Required* | null | +| nameserver.poll.interval | name server poll topic info interval | 30000 | +| brokerserver.heartbeat.interval | broker server heartbeat interval | 30000 | +| producer.group | producer group | $UUID | +| producer.retry.times | producer send messages retry times | 3 | +| producer.timeout | producer send messages timeout | 3000 | + + +### Consumer Configurations +| NAME | DESCRIPTION | DEFAULT | +| ------------- |:-------------:|:------:| +| nameserver.address | name server address *Required* | null | +| nameserver.poll.interval | name server poll topic info interval | 30000 | +| brokerserver.heartbeat.interval | broker server heartbeat interval | 30000 | +| consumer.group | consumer group *Required* | null | +| consumer.topic | consumer topic *Required* | null | +| consumer.tag | consumer topic tag | * | +| consumer.offset.reset.to | what to do when there is no initial offset on the server | latest/earliest/timestamp | +| consumer.offset.from.timestamp | the timestamp when `consumer.offset.reset.to=timestamp` was set | $TIMESTAMP | +| consumer.offset.persist.interval | auto commit offset interval | 5000 | +| consumer.pull.thread.pool.size | consumer pull thread pool size | 20 | +| consumer.batch.size | consumer messages batch size | 32 | +| consumer.delay.when.message.not.found | the delay time when messages were not found | 10 | + + +## License + +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. diff --git a/rocketmq-flink/pom.xml b/rocketmq-flink/pom.xml new file mode 100644 index 000000000..5f38e115f --- /dev/null +++ b/rocketmq-flink/pom.xml @@ -0,0 +1,182 @@ + + + + 4.0.0 + + org.apache.rocketmq + rocketmq-flink + 0.0.1-SNAPSHOT + jar + + + + vesense + Xin Wang + xinwang@apache.org + + + + + UTF-8 + + true + true + + 1.8 + 1.8 + 4.2.0 + 1.4.0 + 2.5 + 2.11 + + + + + org.apache.flink + flink-java + ${flink.version} + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + + + org.apache.flink + flink-clients_${scala.binary.version} + ${flink.version} + + + org.apache.flink + flink-queryable-state-runtime_${scala.binary.version} + ${flink.version} + + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + + + org.apache.rocketmq + rocketmq-common + ${rocketmq.version} + + + io.netty + netty-tcnative + + + + + + commons-lang + commons-lang + ${commons-lang.version} + + + + + junit + junit + test + 4.12 + + + org.apache.rocketmq + rocketmq-namesrv + ${rocketmq.version} + test + + + + + + + + + org.apache.rocketmq + rocketmq-broker + ${rocketmq.version} + test + + + + + + + + + + + + + maven-compiler-plugin + 3.5.1 + + ${maven.compiler.source} + ${maven.compiler.target} + UTF-8 + ${maven.compiler.source} + true + true + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + ${maven.test.skip} + + + + org.apache.rat + apache-rat-plugin + 0.12 + + + README.md + + + + + maven-checkstyle-plugin + 2.17 + + + verify + verify + + style/rmq_checkstyle.xml + UTF-8 + true + true + false + false + + + check + + + + + + + \ No newline at end of file diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java new file mode 100644 index 000000000..f04a9471d --- /dev/null +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang.Validate; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.flink.common.selector.TopicSelector; +import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The RocketMQSink provides at-least-once reliability guarantees when + * checkpoints are enabled and batchFlushOnCheckpoint(true) is set. + * Otherwise, the sink reliability guarantees depends on rocketmq producer's retry policy. + */ +public class RocketMQSink extends RichSinkFunction implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RocketMQSink.class); + + private transient DefaultMQProducer producer; + private boolean async; // false by default + + private Properties props; + private TopicSelector topicSelector; + private KeyValueSerializationSchema serializationSchema; + + private boolean batchFlushOnCheckpoint; // false by default + private List batchList; + + public RocketMQSink(KeyValueSerializationSchema schema, TopicSelector topicSelector, Properties props) { + this.serializationSchema = schema; + this.topicSelector = topicSelector; + this.props = props; + } + + @Override + public void open(Configuration parameters) throws Exception { + Validate.notEmpty(props, "Producer properties can not be empty"); + Validate.notNull(topicSelector, "TopicSelector can not be null"); + Validate.notNull(serializationSchema, "KeyValueSerializationSchema can not be null"); + + producer = new DefaultMQProducer(); + producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask())); + RocketMqConfig.buildProducerConfigs(props, producer); + + batchList = new LinkedList<>(); + + if (batchFlushOnCheckpoint && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) { + LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing."); + batchFlushOnCheckpoint = false; + } + + try { + producer.start(); + } catch (MQClientException e) { + throw new RuntimeException(e); + } + } + + @Override + public void invoke(IN input, Context context) throws Exception { + Message msg = prepareMessage(input); + + if (batchFlushOnCheckpoint) { + batchList.add(msg); + return; + } + + if (async) { + // async sending + try { + producer.send(msg, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + LOG.debug("Async send message success!"); + } + + @Override + public void onException(Throwable throwable) { + if (throwable != null) { + LOG.error("Async send message failure!", throwable); + } + } + }); + } catch (Exception e) { + LOG.error("Async send message failure!", e); + } + } else { + // sync sending, will return a SendResult + try { + SendResult result = producer.send(msg); + if (LOG.isDebugEnabled()) { + LOG.debug("Sync send message status: {}", result.getSendStatus()); + } + } catch (Exception e) { + LOG.error("Sync send message failure!", e); + } + } + } + + // Mapping: from storm tuple -> rocketmq Message + private Message prepareMessage(IN input) { + String topic = topicSelector.getTopic(input); + String tag = topicSelector.getTag(input) != null ? topicSelector.getTag(input) : ""; + + byte[] k = serializationSchema.serializeKey(input); + String key = k != null ? new String(k, StandardCharsets.UTF_8) : ""; + byte[] value = serializationSchema.serializeValue(input); + + Validate.notNull(topic, "the message topic is null"); + Validate.notNull(value, "the message body is null"); + + Message msg = new Message(topic, tag, key, value); + return msg; + } + + public RocketMQSink withAsync(boolean async) { + this.async = async; + return this; + } + + public RocketMQSink withBatchFlushOnCheckpoint(boolean batchFlushOnCheckpoint) { + this.batchFlushOnCheckpoint = batchFlushOnCheckpoint; + return this; + } + + @Override + public void close() throws Exception { + if (producer != null) { + flushSync(); + producer.shutdown(); + } + } + + private void flushSync() throws Exception { + if (batchFlushOnCheckpoint) { + synchronized (batchList) { + if (batchList.size() > 0) { + producer.send(batchList); + batchList.clear(); + } + } + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + flushSync(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // nothing to do + } +} diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java new file mode 100644 index 000000000..58cb222e2 --- /dev/null +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.lang.Validate; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullTaskCallback; +import org.apache.rocketmq.client.consumer.PullTaskContext; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.rocketmq.flink.RocketMqConfig.CONSUMER_OFFSET_EARLIEST; +import static org.apache.rocketmq.flink.RocketMqConfig.CONSUMER_OFFSET_LATEST; +import static org.apache.rocketmq.flink.RocketMqConfig.CONSUMER_OFFSET_TIMESTAMP; +import static org.apache.rocketmq.flink.RocketMqUtils.getInteger; +import static org.apache.rocketmq.flink.RocketMqUtils.getLong; + +/** + * The RocketMQSource is based on RocketMQ pull consumer mode, + * and provides exactly once reliability guarantees when checkpoints are enabled. + * Otherwise, the source doesn't provide any reliability guarantees. + */ +public class RocketMQSource extends RichParallelSourceFunction + implements CheckpointedFunction, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RocketMQSource.class); + + private transient MQPullConsumerScheduleService pullConsumerScheduleService; + private DefaultMQPullConsumer consumer; + + private KeyValueDeserializationSchema schema; + + private transient volatile boolean running; + + private transient ListState> unionOffsetStates; + private Map offsetTable; + private Map restoredOffsets; + + private Properties props; + private String topic; + private String group; + + private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states"; + + private transient volatile boolean restored; + + public RocketMQSource(KeyValueDeserializationSchema schema, Properties props) { + this.schema = schema; + this.props = props; + } + + @Override + public void open(Configuration parameters) throws Exception { + LOG.debug("source open...."); + Validate.notEmpty(props, "Consumer properties can not be empty"); + Validate.notNull(schema, "KeyValueDeserializationSchema can not be null"); + + this.topic = props.getProperty(RocketMqConfig.CONSUMER_TOPIC); + this.group = props.getProperty(RocketMqConfig.CONSUMER_GROUP); + + Validate.notEmpty(topic, "Consumer topic can not be empty"); + Validate.notEmpty(group, "Consumer group can not be empty"); + + if (offsetTable == null) { + offsetTable = new ConcurrentHashMap<>(); + } + if (restoredOffsets == null) { + restoredOffsets = new ConcurrentHashMap<>(); + } + + pullConsumerScheduleService = new MQPullConsumerScheduleService(group); + consumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); + + consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask())); + RocketMqConfig.buildConsumerConfigs(props, consumer); + } + + @Override + public void run(SourceContext context) throws Exception { + LOG.debug("source run...."); + // The lock that guarantees that record emission and state updates are atomic, + // from the view of taking a checkpoint. + final Object lock = context.getCheckpointLock(); + + int delayWhenMessageNotFound = getInteger(props, RocketMqConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND, + RocketMqConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND); + + String tag = props.getProperty(RocketMqConfig.CONSUMER_TAG, RocketMqConfig.DEFAULT_CONSUMER_TAG); + + int pullPoolSize = getInteger(props, RocketMqConfig.CONSUMER_PULL_POOL_SIZE, + RocketMqConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE); + + int pullBatchSize = getInteger(props, RocketMqConfig.CONSUMER_BATCH_SIZE, + RocketMqConfig.DEFAULT_CONSUMER_BATCH_SIZE); + + pullConsumerScheduleService.setPullThreadNums(pullPoolSize); + pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() { + + @Override + public void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) { + try { + long offset = getMessageQueueOffset(mq); + if (offset < 0) { + return; + } + + PullResult pullResult = consumer.pull(mq, tag, offset, pullBatchSize); + switch (pullResult.getPullStatus()) { + case FOUND: + List messages = pullResult.getMsgFoundList(); + for (MessageExt msg : messages) { + byte[] key = msg.getKeys() != null ? msg.getKeys().getBytes(StandardCharsets.UTF_8) : null; + byte[] value = msg.getBody(); + OUT data = schema.deserializeKeyAndValue(key, value); + + // output and state update are atomic + synchronized (lock) { + context.collectWithTimestamp(data, msg.getBornTimestamp()); + putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); + } + } + pullTaskContext.setPullNextDelayTimeMillis(0); // no delay when messages were found + break; + case NO_MATCHED_MSG: + LOG.debug("No matched message after offset {} for queue {}", offset, mq); + pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound); + break; + case NO_NEW_MSG: + pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound); + break; + case OFFSET_ILLEGAL: + LOG.warn("Offset {} is illegal for queue {}", offset, mq); + pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound); + break; + default: + break; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + try { + pullConsumerScheduleService.start(); + } catch (MQClientException e) { + throw new RuntimeException(e); + } + + running = true; + + awaitTermination(); + + } + + private void awaitTermination() throws InterruptedException { + while (running) { + Thread.sleep(50); + } + } + + private long getMessageQueueOffset(MessageQueue mq) throws MQClientException { + Long offset = offsetTable.get(mq); + // restoredOffsets(unionOffsetStates) is the restored global union state; + // should only snapshot mqs that actually belong to us + if (restored && offset == null) { + offset = restoredOffsets.get(mq); + } + if (offset == null) { + offset = consumer.fetchConsumeOffset(mq, false); + if (offset < 0) { + String initialOffset = props.getProperty(RocketMqConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST); + switch (initialOffset) { + case CONSUMER_OFFSET_EARLIEST: + offset = consumer.minOffset(mq); + break; + case CONSUMER_OFFSET_LATEST: + offset = consumer.maxOffset(mq); + break; + case CONSUMER_OFFSET_TIMESTAMP: + offset = consumer.searchOffset(mq, getLong(props, + RocketMqConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis())); + break; + default: + throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO."); + } + } + offsetTable.put(mq, offset); + } + + return offsetTable.get(mq); + } + + private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException { + offsetTable.put(mq, offset); + consumer.updateConsumeOffset(mq, offset); + } + + @Override + public void cancel() { + LOG.debug("cancel ..."); + running = false; + + if (pullConsumerScheduleService != null) { + pullConsumerScheduleService.shutdown(); + } + + offsetTable.clear(); + restoredOffsets.clear(); + } + + @Override + public void close() throws Exception { + LOG.debug("close ..."); + // pretty much the same logic as cancelling + try { + cancel(); + } finally { + super.close(); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + // called when a snapshot for a checkpoint is requested + + if (!running) { + LOG.debug("snapshotState() called on closed source; returning null."); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state {} ...", context.getCheckpointId()); + } + + unionOffsetStates.clear(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}", + offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp()); + } + + for (Map.Entry entry : offsetTable.entrySet()) { + unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue())); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // called every time the user-defined function is initialized, + // be that when the function is first initialized or be that + // when the function is actually recovering from an earlier checkpoint. + // Given this, initializeState() is not only the place where different types of state are initialized, + // but also where state recovery logic is included. + LOG.debug("initialize State ..."); + + this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>( + OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { }))); + + this.restored = context.isRestored(); + + if (restored) { + if (restoredOffsets == null) { + restoredOffsets = new ConcurrentHashMap<>(); + } + for (Tuple2 mqOffsets : unionOffsetStates.get()) { + // unionOffsetStates is the restored global union state; + // should only snapshot mqs that actually belong to us + restoredOffsets.put(mqOffsets.f0, mqOffsets.f1); + } + LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets); + } else { + LOG.info("No restore state for the consumer."); + } + } + + @Override + public TypeInformation getProducedType() { + return schema.getProducedType(); + } +} diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqConfig.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqConfig.java new file mode 100644 index 000000000..e17631a77 --- /dev/null +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqConfig.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.util.Properties; +import java.util.UUID; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.Validate; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; + +import static org.apache.rocketmq.flink.RocketMqUtils.getInteger; + +/** + * RocketMqConfig for Consumer/Producer. + */ +public class RocketMqConfig { + // common + public static final String NAME_SERVER_ADDR = "nameserver.address"; // Required + + public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval"; + public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds + + public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval"; + public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds + + + // producer + public static final String PRODUCER_GROUP = "producer.group"; + + public static final String PRODUCER_RETRY_TIMES = "producer.retry.times"; + public static final int DEFAULT_PRODUCER_RETRY_TIMES = 3; + + public static final String PRODUCER_TIMEOUT = "producer.timeout"; + public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds + + + // consumer + public static final String CONSUMER_GROUP = "consumer.group"; // Required + + public static final String CONSUMER_TOPIC = "consumer.topic"; // Required + + public static final String CONSUMER_TAG = "consumer.tag"; + public static final String DEFAULT_CONSUMER_TAG = "*"; + + public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to"; + public static final String CONSUMER_OFFSET_LATEST = "latest"; + public static final String CONSUMER_OFFSET_EARLIEST = "earliest"; + public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp"; + public static final String CONSUMER_OFFSET_FROM_TIMESTAMP = "consumer.offset.from.timestamp"; + + public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval"; + public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds + + public static final String CONSUMER_PULL_POOL_SIZE = "consumer.pull.thread.pool.size"; + public static final int DEFAULT_CONSUMER_PULL_POOL_SIZE = 20; + + public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size"; + public static final int DEFAULT_CONSUMER_BATCH_SIZE = 32; + + public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = "consumer.delay.when.message.not.found"; + public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 10; + + /** + * Build Producer Configs. + * @param props Properties + * @param producer DefaultMQProducer + */ + public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) { + buildCommonConfigs(props, producer); + + String group = props.getProperty(PRODUCER_GROUP); + if (StringUtils.isEmpty(group)) { + group = UUID.randomUUID().toString(); + } + producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, group)); + + producer.setRetryTimesWhenSendFailed(getInteger(props, + PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES)); + producer.setRetryTimesWhenSendAsyncFailed(getInteger(props, + PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES)); + producer.setSendMsgTimeout(getInteger(props, + PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT)); + } + + /** + * Build Consumer Configs. + * @param props Properties + * @param consumer DefaultMQPushConsumer + */ + public static void buildConsumerConfigs(Properties props, DefaultMQPullConsumer consumer) { + buildCommonConfigs(props, consumer); + + consumer.setMessageModel(MessageModel.CLUSTERING); + + consumer.setPersistConsumerOffsetInterval(getInteger(props, + CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL)); + } + + /** + * Build Common Configs. + * @param props Properties + * @param client ClientConfig + */ + public static void buildCommonConfigs(Properties props, ClientConfig client) { + String nameServers = props.getProperty(NAME_SERVER_ADDR); + Validate.notEmpty(nameServers); + client.setNamesrvAddr(nameServers); + + client.setPollNameServerInterval(getInteger(props, + NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL)); + client.setHeartbeatBrokerInterval(getInteger(props, + BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL)); + } +} diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqUtils.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqUtils.java new file mode 100644 index 000000000..fd7ca3d02 --- /dev/null +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqUtils.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.util.Properties; + +public final class RocketMqUtils { + + public static int getInteger(Properties props, String key, int defaultValue) { + return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue))); + } + + public static long getLong(Properties props, String key, long defaultValue) { + return Long.parseLong(props.getProperty(key, String.valueOf(defaultValue))); + } + + public static boolean getBoolean(Properties props, String key, boolean defaultValue) { + return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue))); + } +} diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java new file mode 100644 index 000000000..264d21163 --- /dev/null +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.selector; + +public class DefaultTopicSelector implements TopicSelector { + private final String topicName; + private final String tagName; + + public DefaultTopicSelector(final String topicName, final String tagName) { + this.topicName = topicName; + this.tagName = tagName; + } + + public DefaultTopicSelector(final String topicName) { + this(topicName, ""); + } + + @Override + public String getTopic(T tuple) { + return topicName; + } + + @Override + public String getTag(T tuple) { + return tagName; + } +} diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java new file mode 100644 index 000000000..3ad8a0325 --- /dev/null +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.selector; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses field name to select topic and tag name from tuple. + */ +public class SimpleTopicSelector implements TopicSelector { + private static final Logger LOG = LoggerFactory.getLogger(SimpleTopicSelector.class); + + private final String topicFieldName; + private final String defaultTopicName; + + private final String tagFieldName; + private final String defaultTagName; + + /** + * SimpleTopicSelector Constructor. + * @param topicFieldName field name used for selecting topic + * @param defaultTopicName default field name used for selecting topic + * @param tagFieldName field name used for selecting tag + * @param defaultTagName default field name used for selecting tag + */ + public SimpleTopicSelector(String topicFieldName, String defaultTopicName, String tagFieldName, String defaultTagName) { + this.topicFieldName = topicFieldName; + this.defaultTopicName = defaultTopicName; + this.tagFieldName = tagFieldName; + this.defaultTagName = defaultTagName; + } + + @Override + public String getTopic(Map tuple) { + if (tuple.containsKey(topicFieldName)) { + Object topic = tuple.get(topicFieldName); + return topic != null ? topic.toString() : defaultTopicName; + } else { + LOG.warn("Field {} Not Found. Returning default topic {}", topicFieldName, defaultTopicName); + return defaultTopicName; + } + } + + @Override + public String getTag(Map tuple) { + if (tuple.containsKey(tagFieldName)) { + Object tag = tuple.get(tagFieldName); + return tag != null ? tag.toString() : defaultTagName; + } else { + LOG.warn("Field {} Not Found. Returning default tag {}", tagFieldName, defaultTagName); + return defaultTagName; + } + } +} diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java new file mode 100644 index 000000000..2a347db6e --- /dev/null +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.selector; + +import java.io.Serializable; + +public interface TopicSelector extends Serializable { + + String getTopic(T tuple); + + String getTag(T tuple); +} diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java new file mode 100644 index 000000000..d8759f9c4 --- /dev/null +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.serialization; + +import java.io.Serializable; + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + +public interface KeyValueDeserializationSchema extends ResultTypeQueryable, Serializable { + T deserializeKeyAndValue(byte[] key, byte[] value); +} diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java new file mode 100644 index 000000000..d847e8a66 --- /dev/null +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.serialization; + +import java.io.Serializable; + +public interface KeyValueSerializationSchema extends Serializable { + + byte[] serializeKey(T tuple); + + byte[] serializeValue(T tuple); +} diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java new file mode 100644 index 000000000..df6390bc4 --- /dev/null +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.serialization; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import org.apache.flink.api.common.typeinfo.TypeInformation; + +public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializationSchema { + public static final String DEFAULT_KEY_FIELD = "key"; + public static final String DEFAULT_VALUE_FIELD = "value"; + + public String keyField; + public String valueField; + + public SimpleKeyValueDeserializationSchema() { + this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD); + } + + /** + * SimpleKeyValueDeserializationSchema Constructor. + * @param keyField tuple field for selecting the key + * @param valueField tuple field for selecting the value + */ + public SimpleKeyValueDeserializationSchema(String keyField, String valueField) { + this.keyField = keyField; + this.valueField = valueField; + } + + @Override + public Map deserializeKeyAndValue(byte[] key, byte[] value) { + HashMap map = new HashMap(2); + if (keyField != null) { + String k = key != null ? new String(key, StandardCharsets.UTF_8) : null; + map.put(keyField, k); + } + if (valueField != null) { + String v = value != null ? new String(value, StandardCharsets.UTF_8) : null; + map.put(valueField, v); + } + return map; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(Map.class); + } +} diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java new file mode 100644 index 000000000..bbd6da39f --- /dev/null +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.serialization; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class SimpleKeyValueSerializationSchema implements KeyValueSerializationSchema { + public static final String DEFAULT_KEY_FIELD = "key"; + public static final String DEFAULT_VALUE_FIELD = "value"; + + public String keyField; + public String valueField; + + public SimpleKeyValueSerializationSchema() { + this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD); + } + + /** + * SimpleKeyValueSerializationSchema Constructor. + * @param keyField tuple field for selecting the key + * @param valueField tuple field for selecting the value + */ + public SimpleKeyValueSerializationSchema(String keyField, String valueField) { + this.keyField = keyField; + this.valueField = valueField; + } + + @Override + public byte[] serializeKey(Map tuple) { + if (tuple == null || keyField == null) { + return null; + } + Object key = tuple.get(keyField); + return key != null ? key.toString().getBytes(StandardCharsets.UTF_8) : null; + } + + @Override + public byte[] serializeValue(Map tuple) { + if (tuple == null || valueField == null) { + return null; + } + Object value = tuple.get(valueField); + return value != null ? value.toString().getBytes(StandardCharsets.UTF_8) : null; + } + +} diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ConsumerTest.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ConsumerTest.java new file mode 100644 index 000000000..7c83abe68 --- /dev/null +++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ConsumerTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.util.List; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; + +public class ConsumerTest { + public static void main(String[] args) { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g00003"); + consumer.setNamesrvAddr("localhost:9876"); + try { + consumer.subscribe("flink-sink2", "*"); + } catch (MQClientException e) { + e.printStackTrace(); + } + consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + for (MessageExt msg : msgs) { + System.out.println(msg.getKeys() + ":" + new String(msg.getBody())); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + try { + consumer.start(); + } catch (MQClientException e) { + e.printStackTrace(); + } + } +} diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ProducerTest.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ProducerTest.java new file mode 100644 index 000000000..2539c4757 --- /dev/null +++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ProducerTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; + +public class ProducerTest { + public static void main(String[] args) { + DefaultMQProducer producer = new DefaultMQProducer("p001"); + producer.setNamesrvAddr("localhost:9876"); + try { + producer.start(); + } catch (MQClientException e) { + e.printStackTrace(); + } + for (int i = 0; i < 10000; i++) { + Message msg = new Message("flink-source2" , "", "id_"+i, ("country_X province_" + i).getBytes()); + try { + producer.send(msg); + } catch (MQClientException e) { + e.printStackTrace(); + } catch (RemotingException e) { + e.printStackTrace(); + } catch (MQBrokerException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("send " + i); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQFlinkExample.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQFlinkExample.java new file mode 100644 index 000000000..0031fe17f --- /dev/null +++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQFlinkExample.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.rocketmq.flink.common.selector.DefaultTopicSelector; +import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema; +import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueSerializationSchema; + +public class RocketMQFlinkExample { + public static void main(String[] args) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // enable checkpoint + env.enableCheckpointing(3000); + + Properties consumerProps = new Properties(); + consumerProps.setProperty(RocketMqConfig.NAME_SERVER_ADDR, "localhost:9876"); + consumerProps.setProperty(RocketMqConfig.CONSUMER_GROUP, "c002"); + consumerProps.setProperty(RocketMqConfig.CONSUMER_TOPIC, "flink-source2"); + + Properties producerProps = new Properties(); + producerProps.setProperty(RocketMqConfig.NAME_SERVER_ADDR, "localhost:9876"); + + env.addSource(new RocketMQSource(new SimpleKeyValueDeserializationSchema("id", "address"), consumerProps)) + .name("rocketmq-source") + .setParallelism(2) + .process(new ProcessFunction() { + @Override + public void processElement(Map in, Context ctx, Collector out) throws Exception { + HashMap result = new HashMap(); + result.put("id", in.get("id")); + String[] arr = in.get("address").toString().split("\\s+"); + result.put("province", arr[arr.length-1]); + out.collect(result); + } + }) + .name("upper-processor") + .setParallelism(2) + .addSink(new RocketMQSink(new SimpleKeyValueSerializationSchema("id", "province"), + new DefaultTopicSelector("flink-sink2"), producerProps).withBatchFlushOnCheckpoint(true)) + .name("rocketmq-sink") + .setParallelism(2); + + try { + env.execute("rocketmq-flink-example"); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/rocketmq-flink/style/copyright/Apache.xml b/rocketmq-flink/style/copyright/Apache.xml new file mode 100644 index 000000000..2db86d057 --- /dev/null +++ b/rocketmq-flink/style/copyright/Apache.xml @@ -0,0 +1,24 @@ + + + + + + \ No newline at end of file diff --git a/rocketmq-flink/style/copyright/profiles_settings.xml b/rocketmq-flink/style/copyright/profiles_settings.xml new file mode 100644 index 000000000..4c0e521b7 --- /dev/null +++ b/rocketmq-flink/style/copyright/profiles_settings.xml @@ -0,0 +1,64 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/rocketmq-flink/style/rmq_checkstyle.xml b/rocketmq-flink/style/rmq_checkstyle.xml new file mode 100644 index 000000000..e3155cc00 --- /dev/null +++ b/rocketmq-flink/style/rmq_checkstyle.xml @@ -0,0 +1,135 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/rocketmq-flink/style/rmq_codeStyle.xml b/rocketmq-flink/style/rmq_codeStyle.xml new file mode 100644 index 000000000..cd95ee64d --- /dev/null +++ b/rocketmq-flink/style/rmq_codeStyle.xml @@ -0,0 +1,157 @@ + + + + + + + \ No newline at end of file From 6080c634bbadd975ad215f3e57a0f05cabe9d7b1 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Fri, 9 Mar 2018 13:24:26 +0800 Subject: [PATCH 2/4] Added unit tests --- rocketmq-flink/pom.xml | 12 ++ .../apache/rocketmq/flink/RocketMQSink.java | 2 +- .../apache/rocketmq/flink/RocketMQSource.java | 12 +- .../apache/rocketmq/flink/RunningChecker.java | 33 +++++ .../rocketmq/flink/RocketMQSinkTest.java | 75 +++++++++++ .../rocketmq/flink/RocketMQSourceTest.java | 121 ++++++++++++++++++ .../org/apache/rocketmq/flink/TestUtils.java | 33 +++++ .../selector/DefaultTopicSelectorTest.java | 37 ++++++ .../selector/SimpleTopicSelectorTest.java | 49 +++++++ ...SimpleKeyValueSerializationSchemaTest.java | 42 ++++++ .../flink/{ => example}/ConsumerTest.java | 2 +- .../flink/{ => example}/ProducerTest.java | 2 +- .../{ => example}/RocketMQFlinkExample.java | 5 +- 13 files changed, 416 insertions(+), 9 deletions(-) create mode 100644 rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RunningChecker.java create mode 100644 rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java create mode 100644 rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java create mode 100644 rocketmq-flink/src/test/java/org/apache/rocketmq/flink/TestUtils.java create mode 100644 rocketmq-flink/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java create mode 100644 rocketmq-flink/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java create mode 100644 rocketmq-flink/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java rename rocketmq-flink/src/test/java/org/apache/rocketmq/flink/{ => example}/ConsumerTest.java (98%) rename rocketmq-flink/src/test/java/org/apache/rocketmq/flink/{ => example}/ProducerTest.java (98%) rename rocketmq-flink/src/test/java/org/apache/rocketmq/flink/{ => example}/RocketMQFlinkExample.java (94%) diff --git a/rocketmq-flink/pom.xml b/rocketmq-flink/pom.xml index 5f38e115f..91d2c45ba 100644 --- a/rocketmq-flink/pom.xml +++ b/rocketmq-flink/pom.xml @@ -98,6 +98,18 @@ test 4.12 + + org.powermock + powermock-module-junit4 + 1.5.5 + test + + + org.powermock + powermock-api-mockito + 1.5.5 + test + org.apache.rocketmq rocketmq-namesrv diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java index f04a9471d..ae466a019 100644 --- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java @@ -124,7 +124,7 @@ public void onException(Throwable throwable) { try { SendResult result = producer.send(msg); if (LOG.isDebugEnabled()) { - LOG.debug("Sync send message status: {}", result.getSendStatus()); + LOG.debug("Sync send message result: {}", result); } } catch (Exception e) { LOG.error("Sync send message failure!", e); diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index 58cb222e2..b55482074 100644 --- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -71,7 +71,7 @@ public class RocketMQSource extends RichParallelSourceFunction private KeyValueDeserializationSchema schema; - private transient volatile boolean running; + private RunningChecker runningChecker; private transient ListState> unionOffsetStates; private Map offsetTable; @@ -109,6 +109,8 @@ public void open(Configuration parameters) throws Exception { restoredOffsets = new ConcurrentHashMap<>(); } + runningChecker = new RunningChecker(); + pullConsumerScheduleService = new MQPullConsumerScheduleService(group); consumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); @@ -188,14 +190,14 @@ public void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) { throw new RuntimeException(e); } - running = true; + runningChecker.setRunning(true); awaitTermination(); } private void awaitTermination() throws InterruptedException { - while (running) { + while (runningChecker.isRunning()) { Thread.sleep(50); } } @@ -240,7 +242,7 @@ private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClient @Override public void cancel() { LOG.debug("cancel ..."); - running = false; + runningChecker.setRunning(false); if (pullConsumerScheduleService != null) { pullConsumerScheduleService.shutdown(); @@ -265,7 +267,7 @@ public void close() throws Exception { public void snapshotState(FunctionSnapshotContext context) throws Exception { // called when a snapshot for a checkpoint is requested - if (!running) { + if (!runningChecker.isRunning()) { LOG.debug("snapshotState() called on closed source; returning null."); return; } diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RunningChecker.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RunningChecker.java new file mode 100644 index 000000000..b7bc2b912 --- /dev/null +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RunningChecker.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.io.Serializable; + +public class RunningChecker implements Serializable { + private volatile boolean isRunning = false; + + public boolean isRunning() { + return isRunning; + } + + public void setRunning(boolean running) { + isRunning = running; + } +} diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java new file mode 100644 index 000000000..ec844f2a6 --- /dev/null +++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.flink.common.selector.DefaultTopicSelector; +import org.apache.rocketmq.flink.common.selector.TopicSelector; +import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema; +import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueSerializationSchema; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.rocketmq.flink.TestUtils.setFieldValue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class RocketMQSinkTest { + + private RocketMQSink rocketMQSink; + private DefaultMQProducer producer; + + @Before + public void setUp() throws Exception { + KeyValueSerializationSchema serializationSchema = new SimpleKeyValueSerializationSchema("id", "name"); + TopicSelector topicSelector = new DefaultTopicSelector("tpc"); + Properties props = new Properties(); + rocketMQSink = new RocketMQSink(serializationSchema, topicSelector, props); + + producer = mock(DefaultMQProducer.class); + setFieldValue(rocketMQSink, "producer", producer); + } + + @Test + public void testSink() throws Exception { + Map tuple = new HashMap(); + tuple.put("id", "x001"); + tuple.put("name", "vesense"); + tuple.put("tpc", "tpc1"); + + rocketMQSink.invoke(tuple, null); + + verify(producer).send(any(Message.class)); + + } + + @Test + public void close() throws Exception { + rocketMQSink.close(); + + verify(producer).shutdown(); + } + +} \ No newline at end of file diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java new file mode 100644 index 000000000..b7aaee07e --- /dev/null +++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema; +import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.rocketmq.flink.TestUtils.setFieldValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RocketMQSourceTest { + + private RocketMQSource rocketMQSource; + private MQPullConsumerScheduleService pullConsumerScheduleService; + private DefaultMQPullConsumer consumer; + private KeyValueDeserializationSchema deserializationSchema; + private String topic = "tpc"; + + @Before + public void setUp() throws Exception { + deserializationSchema = new SimpleKeyValueDeserializationSchema(); + Properties props = new Properties(); + rocketMQSource = new RocketMQSource(deserializationSchema, props); + + setFieldValue(rocketMQSource, "topic", topic); + setFieldValue(rocketMQSource, "runningChecker", new SingleRunningCheck()); + setFieldValue(rocketMQSource, "offsetTable", new ConcurrentHashMap<>()); + setFieldValue(rocketMQSource, "restoredOffsets", new ConcurrentHashMap<>()); + + pullConsumerScheduleService = new MQPullConsumerScheduleService("g"); + + consumer = mock(DefaultMQPullConsumer.class); + pullConsumerScheduleService.setDefaultMQPullConsumer(consumer); + setFieldValue(rocketMQSource, "consumer", consumer); + setFieldValue(rocketMQSource, "pullConsumerScheduleService", pullConsumerScheduleService); + } + + @Test + public void testSource() throws Exception { + List msgFoundList = new ArrayList<>(); + MessageExt messageExt = new MessageExt(); + messageExt.setKeys("keys"); + messageExt.setBody("body data".getBytes()); + messageExt.setBornTimestamp(System.currentTimeMillis()); + msgFoundList.add(messageExt); + PullResult pullResult = new PullResult(PullStatus.FOUND, 3, 1, 5, msgFoundList); + + when(consumer.fetchConsumeOffset(any(MessageQueue.class), anyBoolean())).thenReturn(2L); + when(consumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenReturn(pullResult); + + SourceContext context = mock(SourceContext.class); + when(context.getCheckpointLock()).thenReturn(new Object()); + + rocketMQSource.run(context); + + // schedule the pull task + Set set = new HashSet(); + set.add(new MessageQueue(topic, "brk", 1)); + pullConsumerScheduleService.putTask(topic, set); + + MessageExt msg = pullResult.getMsgFoundList().get(0); + + // atLeastOnce: re-pulling immediately when messages found before + verify(context, atLeastOnce()).collectWithTimestamp(deserializationSchema.deserializeKeyAndValue(msg.getKeys().getBytes(), + msg.getBody()), msg.getBornTimestamp()); + } + + @Test + public void close() throws Exception { + rocketMQSource.close(); + + verify(consumer).shutdown(); + } + + class SingleRunningCheck extends RunningChecker { + @Override + public boolean isRunning() { + return false; + } + } +} \ No newline at end of file diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/TestUtils.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/TestUtils.java new file mode 100644 index 000000000..d0a94508c --- /dev/null +++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/TestUtils.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.lang.reflect.Field; + +public class TestUtils { + public static void setFieldValue(Object obj, String fieldName, Object value) { + try { + Field field = obj.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(obj, value); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java new file mode 100644 index 000000000..2f4685cc8 --- /dev/null +++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.selector; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class DefaultTopicSelectorTest { + @Test + public void getTopic() throws Exception { + DefaultTopicSelector selector = new DefaultTopicSelector("rocket"); + assertEquals("rocket", selector.getTopic(null)); + assertEquals("", selector.getTag(null)); + + selector = new DefaultTopicSelector("rocket", "tg"); + assertEquals("rocket", selector.getTopic(null)); + assertEquals("tg", selector.getTag(null)); + } + +} \ No newline at end of file diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java new file mode 100644 index 000000000..6ac1a578f --- /dev/null +++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.selector; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class SimpleTopicSelectorTest { + @Test + public void getTopic() throws Exception { + SimpleTopicSelector selector = new SimpleTopicSelector("tpc", "dtpc", "tg", "dtg"); + Map tuple = new HashMap(); + tuple.put("id", "x001"); + tuple.put("name", "vesense"); + tuple.put("tpc", "tpc1"); + tuple.put("tg", "tg1"); + + assertEquals("tpc1", selector.getTopic(tuple)); + assertEquals("tg1", selector.getTag(tuple)); + + tuple = new HashMap(); + tuple.put("id", "x001"); + tuple.put("name", "vesense"); + + assertEquals("dtpc", selector.getTopic(tuple)); + assertEquals("dtg", selector.getTag(tuple)); + } + +} \ No newline at end of file diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java new file mode 100644 index 000000000..98aa79310 --- /dev/null +++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.serialization; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class SimpleKeyValueSerializationSchemaTest { + @Test + public void serializeKeyAndValue() throws Exception { + SimpleKeyValueSerializationSchema serializationSchema = new SimpleKeyValueSerializationSchema("id", "name"); + SimpleKeyValueDeserializationSchema deserializationSchema = new SimpleKeyValueDeserializationSchema("id", "name"); + + Map tuple = new HashMap(); + tuple.put("id", "x001"); + tuple.put("name", "vesense"); + + assertEquals(tuple, deserializationSchema.deserializeKeyAndValue(serializationSchema.serializeKey(tuple), + serializationSchema.serializeValue(tuple))); + } + +} \ No newline at end of file diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ConsumerTest.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java similarity index 98% rename from rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ConsumerTest.java rename to rocketmq-flink/src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java index 7c83abe68..1b07b8d55 100644 --- a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ConsumerTest.java +++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.rocketmq.flink; +package org.apache.rocketmq.flink.example; import java.util.List; diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ProducerTest.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java similarity index 98% rename from rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ProducerTest.java rename to rocketmq-flink/src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java index 2539c4757..c04ca74e8 100644 --- a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/ProducerTest.java +++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.rocketmq.flink; +package org.apache.rocketmq.flink.example; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQFlinkExample.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java similarity index 94% rename from rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQFlinkExample.java rename to rocketmq-flink/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java index 0031fe17f..02a551be7 100644 --- a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/RocketMQFlinkExample.java +++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.rocketmq.flink; +package org.apache.rocketmq.flink.example; import java.util.HashMap; import java.util.Map; @@ -25,6 +25,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; +import org.apache.rocketmq.flink.RocketMQSink; +import org.apache.rocketmq.flink.RocketMQSource; +import org.apache.rocketmq.flink.RocketMqConfig; import org.apache.rocketmq.flink.common.selector.DefaultTopicSelector; import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema; import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueSerializationSchema; From fb0875657d6b04f735cf060e59fc665ced780674 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Tue, 20 Mar 2018 23:05:36 +0800 Subject: [PATCH 3/4] Addressed review comments --- rocketmq-flink/README.md | 6 ++-- rocketmq-flink/pom.xml | 24 ++----------- ...ocketMqConfig.java => RocketMQConfig.java} | 6 ++-- .../apache/rocketmq/flink/RocketMQSink.java | 8 ++--- .../apache/rocketmq/flink/RocketMQSource.java | 34 +++++++++---------- ...{RocketMqUtils.java => RocketMQUtils.java} | 2 +- .../flink/example/RocketMQFlinkExample.java | 10 +++--- 7 files changed, 34 insertions(+), 56 deletions(-) rename rocketmq-flink/src/main/java/org/apache/rocketmq/flink/{RocketMqConfig.java => RocketMQConfig.java} (97%) rename rocketmq-flink/src/main/java/org/apache/rocketmq/flink/{RocketMqUtils.java => RocketMQUtils.java} (97%) diff --git a/rocketmq-flink/README.md b/rocketmq-flink/README.md index 629f2c934..ab1d45653 100644 --- a/rocketmq-flink/README.md +++ b/rocketmq-flink/README.md @@ -97,7 +97,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm ``` ## Configurations -The following configurations are all from the class `org.apache.rocketmq.flink.RocketMqConfig`. +The following configurations are all from the class `org.apache.rocketmq.flink.RocketMQConfig`. ### Producer Configurations | NAME | DESCRIPTION | DEFAULT | @@ -105,7 +105,7 @@ The following configurations are all from the class `org.apache.rocketmq.flink.R | nameserver.address | name server address *Required* | null | | nameserver.poll.interval | name server poll topic info interval | 30000 | | brokerserver.heartbeat.interval | broker server heartbeat interval | 30000 | -| producer.group | producer group | $UUID | +| producer.group | producer group | `UUID.randomUUID().toString()` | | producer.retry.times | producer send messages retry times | 3 | | producer.timeout | producer send messages timeout | 3000 | @@ -120,7 +120,7 @@ The following configurations are all from the class `org.apache.rocketmq.flink.R | consumer.topic | consumer topic *Required* | null | | consumer.tag | consumer topic tag | * | | consumer.offset.reset.to | what to do when there is no initial offset on the server | latest/earliest/timestamp | -| consumer.offset.from.timestamp | the timestamp when `consumer.offset.reset.to=timestamp` was set | $TIMESTAMP | +| consumer.offset.from.timestamp | the timestamp when `consumer.offset.reset.to=timestamp` was set | `System.currentTimeMillis()` | | consumer.offset.persist.interval | auto commit offset interval | 5000 | | consumer.pull.thread.pool.size | consumer pull thread pool size | 20 | | consumer.batch.size | consumer messages batch size | 32 | diff --git a/rocketmq-flink/pom.xml b/rocketmq-flink/pom.xml index 91d2c45ba..ecb3436b7 100644 --- a/rocketmq-flink/pom.xml +++ b/rocketmq-flink/pom.xml @@ -25,19 +25,11 @@ 0.0.1-SNAPSHOT jar - - - vesense - Xin Wang - xinwang@apache.org - - - UTF-8 - true - true + false + false 1.8 1.8 @@ -115,24 +107,12 @@ rocketmq-namesrv ${rocketmq.version} test - - - - - - org.apache.rocketmq rocketmq-broker ${rocketmq.version} test - - - - - - diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqConfig.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java similarity index 97% rename from rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqConfig.java rename to rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java index e17631a77..8ec760b93 100644 --- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqConfig.java +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java @@ -28,12 +28,12 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import static org.apache.rocketmq.flink.RocketMqUtils.getInteger; +import static org.apache.rocketmq.flink.RocketMQUtils.getInteger; /** - * RocketMqConfig for Consumer/Producer. + * RocketMQConfig for Consumer/Producer. */ -public class RocketMqConfig { +public class RocketMQConfig { // common public static final String NAME_SERVER_ADDR = "nameserver.address"; // Required diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java index ae466a019..e79d1b40f 100644 --- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java @@ -75,7 +75,7 @@ public void open(Configuration parameters) throws Exception { producer = new DefaultMQProducer(); producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask())); - RocketMqConfig.buildProducerConfigs(props, producer); + RocketMQConfig.buildProducerConfigs(props, producer); batchList = new LinkedList<>(); @@ -106,7 +106,7 @@ public void invoke(IN input, Context context) throws Exception { producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - LOG.debug("Async send message success!"); + LOG.debug("Async send message success! result: {}", sendResult); } @Override @@ -123,9 +123,7 @@ public void onException(Throwable throwable) { // sync sending, will return a SendResult try { SendResult result = producer.send(msg); - if (LOG.isDebugEnabled()) { - LOG.debug("Sync send message result: {}", result); - } + LOG.debug("Sync send message result: {}", result); } catch (Exception e) { LOG.error("Sync send message failure!", e); } diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index b55482074..282f1f92d 100644 --- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -48,11 +48,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.rocketmq.flink.RocketMqConfig.CONSUMER_OFFSET_EARLIEST; -import static org.apache.rocketmq.flink.RocketMqConfig.CONSUMER_OFFSET_LATEST; -import static org.apache.rocketmq.flink.RocketMqConfig.CONSUMER_OFFSET_TIMESTAMP; -import static org.apache.rocketmq.flink.RocketMqUtils.getInteger; -import static org.apache.rocketmq.flink.RocketMqUtils.getLong; +import static org.apache.rocketmq.flink.RocketMQConfig.CONSUMER_OFFSET_EARLIEST; +import static org.apache.rocketmq.flink.RocketMQConfig.CONSUMER_OFFSET_LATEST; +import static org.apache.rocketmq.flink.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP; +import static org.apache.rocketmq.flink.RocketMQUtils.getInteger; +import static org.apache.rocketmq.flink.RocketMQUtils.getLong; /** * The RocketMQSource is based on RocketMQ pull consumer mode, @@ -96,8 +96,8 @@ public void open(Configuration parameters) throws Exception { Validate.notEmpty(props, "Consumer properties can not be empty"); Validate.notNull(schema, "KeyValueDeserializationSchema can not be null"); - this.topic = props.getProperty(RocketMqConfig.CONSUMER_TOPIC); - this.group = props.getProperty(RocketMqConfig.CONSUMER_GROUP); + this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC); + this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP); Validate.notEmpty(topic, "Consumer topic can not be empty"); Validate.notEmpty(group, "Consumer group can not be empty"); @@ -115,7 +115,7 @@ public void open(Configuration parameters) throws Exception { consumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask())); - RocketMqConfig.buildConsumerConfigs(props, consumer); + RocketMQConfig.buildConsumerConfigs(props, consumer); } @Override @@ -125,16 +125,16 @@ public void run(SourceContext context) throws Exception { // from the view of taking a checkpoint. final Object lock = context.getCheckpointLock(); - int delayWhenMessageNotFound = getInteger(props, RocketMqConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND, - RocketMqConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND); + int delayWhenMessageNotFound = getInteger(props, RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND, + RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND); - String tag = props.getProperty(RocketMqConfig.CONSUMER_TAG, RocketMqConfig.DEFAULT_CONSUMER_TAG); + String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG); - int pullPoolSize = getInteger(props, RocketMqConfig.CONSUMER_PULL_POOL_SIZE, - RocketMqConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE); + int pullPoolSize = getInteger(props, RocketMQConfig.CONSUMER_PULL_POOL_SIZE, + RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE); - int pullBatchSize = getInteger(props, RocketMqConfig.CONSUMER_BATCH_SIZE, - RocketMqConfig.DEFAULT_CONSUMER_BATCH_SIZE); + int pullBatchSize = getInteger(props, RocketMQConfig.CONSUMER_BATCH_SIZE, + RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE); pullConsumerScheduleService.setPullThreadNums(pullPoolSize); pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() { @@ -212,7 +212,7 @@ private long getMessageQueueOffset(MessageQueue mq) throws MQClientException { if (offset == null) { offset = consumer.fetchConsumeOffset(mq, false); if (offset < 0) { - String initialOffset = props.getProperty(RocketMqConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST); + String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST); switch (initialOffset) { case CONSUMER_OFFSET_EARLIEST: offset = consumer.minOffset(mq); @@ -222,7 +222,7 @@ private long getMessageQueueOffset(MessageQueue mq) throws MQClientException { break; case CONSUMER_OFFSET_TIMESTAMP: offset = consumer.searchOffset(mq, getLong(props, - RocketMqConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis())); + RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis())); break; default: throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO."); diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqUtils.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQUtils.java similarity index 97% rename from rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqUtils.java rename to rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQUtils.java index fd7ca3d02..9ca1de230 100644 --- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMqUtils.java +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQUtils.java @@ -20,7 +20,7 @@ import java.util.Properties; -public final class RocketMqUtils { +public final class RocketMQUtils { public static int getInteger(Properties props, String key, int defaultValue) { return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue))); diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java index 02a551be7..b2a4034e8 100644 --- a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java +++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java @@ -25,9 +25,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; +import org.apache.rocketmq.flink.RocketMQConfig; import org.apache.rocketmq.flink.RocketMQSink; import org.apache.rocketmq.flink.RocketMQSource; -import org.apache.rocketmq.flink.RocketMqConfig; import org.apache.rocketmq.flink.common.selector.DefaultTopicSelector; import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema; import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueSerializationSchema; @@ -40,12 +40,12 @@ public static void main(String[] args) { env.enableCheckpointing(3000); Properties consumerProps = new Properties(); - consumerProps.setProperty(RocketMqConfig.NAME_SERVER_ADDR, "localhost:9876"); - consumerProps.setProperty(RocketMqConfig.CONSUMER_GROUP, "c002"); - consumerProps.setProperty(RocketMqConfig.CONSUMER_TOPIC, "flink-source2"); + consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "localhost:9876"); + consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "c002"); + consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "flink-source2"); Properties producerProps = new Properties(); - producerProps.setProperty(RocketMqConfig.NAME_SERVER_ADDR, "localhost:9876"); + producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "localhost:9876"); env.addSource(new RocketMQSource(new SimpleKeyValueDeserializationSchema("id", "address"), consumerProps)) .name("rocketmq-source") From 09d9cd9d066619e946d720489e2e3cd12795a5b9 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Wed, 21 Mar 2018 19:28:21 +0800 Subject: [PATCH 4/4] Addressed review comments --- .../apache/rocketmq/flink/RocketMQSource.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index 282f1f92d..2dc8fd5d6 100644 --- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -148,6 +148,7 @@ public void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) { } PullResult pullResult = consumer.pull(mq, tag, offset, pullBatchSize); + boolean found = false; switch (pullResult.getPullStatus()) { case FOUND: List messages = pullResult.getMsgFoundList(); @@ -159,25 +160,31 @@ public void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) { // output and state update are atomic synchronized (lock) { context.collectWithTimestamp(data, msg.getBornTimestamp()); - putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); } } - pullTaskContext.setPullNextDelayTimeMillis(0); // no delay when messages were found + found = true; break; case NO_MATCHED_MSG: LOG.debug("No matched message after offset {} for queue {}", offset, mq); - pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound); break; case NO_NEW_MSG: - pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound); break; case OFFSET_ILLEGAL: LOG.warn("Offset {} is illegal for queue {}", offset, mq); - pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound); break; default: break; } + + synchronized (lock) { + putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); + } + + if (found) { + pullTaskContext.setPullNextDelayTimeMillis(0); // no delay when messages were found + } else { + pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound); + } } catch (Exception e) { throw new RuntimeException(e); }