Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROCKETMQ-82: RocketMQ-Flink Integration #45

Merged
merged 4 commits into from
Mar 23, 2018

Conversation

vesense
Copy link
Member

@vesense vesense commented Jan 27, 2018

What is the purpose of the change

RocketMQ integration for Apache Flink. This module includes the RocketMQ source and sink that allows a flink job to either write messages into a topic or read from topics in a flink job.
https://issues.apache.org/jira/browse/ROCKETMQ-82

Brief changelog

  • RocketMQSource - The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability guarantees when checkpoints are enabled.
    Otherwise, the source doesn't provide any reliability guarantees.
  • RocketMQSink - The RocketMQSink provides at-least-once reliability guarantees when checkpoints are enabled and withBatchFlushOnCheckpoint(true) is set.
    Otherwise, the sink reliability guarantees depends on rocketmq producer's retry policy, for this case, the messages sending way is sync by default,
    but you can change it by invoking withAsync(true).
  • KeyValueDeserializationSchema - The main API for deserializing topic and tags is the org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema interface.
    rocketmq-flink includes general purpose KeyValueDeserializationSchema implementations called SimpleKeyValueDeserializationSchema.
  • KeyValueSerializationSchema - The main API for serializing topic and tags is the org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema interface.
    rocketmq-flink includes general purpose KeyValueSerializationSchema implementations called SimpleKeyValueSerializationSchema.
  • TopicSelector - The main API for selecting topic and tags is the org.apache.rocketmq.flink.common.selector.TopicSelector interface.
    rocketmq-flink includes general purpose TopicSelector implementations called DefaultTopicSelector and SimpleTopicSelector.
  • RocketMQFlinkExample - which receive messages from RocketMQ brokers and send messages to broker after processing.

Verifying this change

local & unit tests check passed ✅
apache-rat check passed ✅
checkstyle check passed ✅

@vesense
Copy link
Member Author

vesense commented Jan 27, 2018

@vongosling @zhouxinyu Do yo have time to take a look?

@vesense
Copy link
Member Author

vesense commented Feb 1, 2018

@vongosling @zhouxinyu ping..

@vesense
Copy link
Member Author

vesense commented Feb 6, 2018

Any comments are welcome..

@vesense
Copy link
Member Author

vesense commented Feb 23, 2018

Will add more unit tests later.

@vesense
Copy link
Member Author

vesense commented Mar 9, 2018

Added unit tests.

@zhouxinyu
Copy link
Member

Cool, let's move this PR forward.

@vesense
Copy link
Member Author

vesense commented Mar 20, 2018

I'm glad to see that some users have used this patch in some scenes. But I would suggest to fork the code from official repo, not my personal.
And I will always keep my eyes on the RocketMQ streaming integration.

@vesense
Copy link
Member Author

vesense commented Mar 20, 2018

Also, I will update the status of integrating and SQL modules in community mailing list periodically.

Copy link
Member

@zhouxinyu zhouxinyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments, please check.

| consumer.topic | consumer topic *Required* | null |
| consumer.tag | consumer topic tag | * |
| consumer.offset.reset.to | what to do when there is no initial offset on the server | latest/earliest/timestamp |
| consumer.offset.from.timestamp | the timestamp when `consumer.offset.reset.to=timestamp` was set | $TIMESTAMP |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value $TIMESTAMP and $UUID are confusing, especially the $ symbol, a short description may be better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Will fix.

<developer>
<id>vesense</id>
<name>Xin Wang</name>
<email>xinwang@apache.org</email>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not recommended to use personal user info, see: https://github.com/apache/rocketmq/blob/master/pom.xml

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove.

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--maven properties -->
<maven.test.skip>true</maven.test.skip>
<maven.javadoc.skip>true</maven.javadoc.skip>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, it's not a good practice that skip test and javadoc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix.

<!--<groupId>ch.qos.logback</groupId>-->
<!--<artifactId>logback-classic</artifactId>-->
<!--</exclusion>-->
<!--</exclusions>-->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the exclusions section isn't necessary, just remove it and make code clean.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove.

/**
* RocketMqConfig for Consumer/Producer.
*/
public class RocketMqConfig {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use RocketMQConfig

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also RocketMqUtils -> RocketMQUtils.

producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
LOG.debug("Async send message success!");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about appending the message id in send result to debug?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can print the SendResult here following the way of sync sending.

// sync sending, will return a SendResult
try {
SendResult result = producer.send(msg);
if (LOG.isDebugEnabled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check whether debug is enabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Will remove.

// output and state update are atomic
synchronized (lock) {
context.collectWithTimestamp(data, msg.getBornTimestamp());
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need use the next begin offset in other PullStatus?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We need the nextBeginOffset in the PullResult to update the offset table which will be checkpointing when snapshotState invoked .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we also should update the offset when the pullresult is OFFSET_ILLEGAL or NO_MATCHED_MSG

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Will fix.

@vesense
Copy link
Member Author

vesense commented Mar 20, 2018

@zhouxinyu Addressed your comments.

@vesense
Copy link
Member Author

vesense commented Mar 21, 2018

@zhouxinyu Addressed all of your comments. Please take a look again.

Message msg = prepareMessage(input);

if (batchFlushOnCheckpoint) {
batchList.add(msg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add action isn't protected by the lock batchList, is this a concurrent issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an issue. Each RocketMQSink instance belong to a thread. We just need to sync batchList when snapshotState invoked.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

@zhouxinyu
Copy link
Member

LGTM now~

@vesense
Copy link
Member Author

vesense commented Mar 22, 2018

Thanks @zhouxinyu

@vesense
Copy link
Member Author

vesense commented Mar 23, 2018

@zhouxinyu Can we merge this in?

@vongosling
Copy link
Member

LGTM

@zhouxinyu zhouxinyu merged commit 45efa39 into apache:master Mar 23, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants