-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-2349: Add one RocketMQ plugin for the Apache Storm #2024
Conversation
I will update the POM files accordingly based on STORM-2416(Reduce release package size) later. |
Any comments are welcome. |
external/storm-rocketmq/pom.xml
Outdated
</dependency> | ||
<dependency> | ||
<groupId>commons-lang</groupId> | ||
<artifactId>commons-lang</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need, RocketMQ client has dependency commons-lang3, which is a next generation for commons lang package~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used for validating parameters. It will cause ClassNotFound
exception without the dependency.
external/storm-rocketmq/pom.xml
Outdated
<dependency> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>log4j-over-slf4j</artifactId> | ||
<scope>test</scope> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the latest storm core still dependency log4j?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not used. Will remove.
} else { | ||
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nowadays, RocketMQ does not supported many instances in one machine. If we wish it , we must set the instanceID, as the 75 line listed in the https://github.com/rocketmq/rocketmq-storm/blob/master/src/main/java/org/apache/rocketmq/integration/storm/spout/SimpleMessageSpout.java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I noticed this. Please see: line 156 - 162 in RocketMQConfig
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vesense In fact I don't suggest that per task has a consumer by setting the instanceId, thus per worker maybe have lots of consumer. Why not consider that per worker only has a consumer by singleton pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I see the RocketMQ Consumer is thread-safe, sharing a single instance across threads should generally be faster than having multiple instances, as well as Producer. I will refactor them.
@hustfxj Thanks for your suggestion.
// use taskID/UUID for client name by default | ||
String defaultClientName; | ||
if (context != null) { | ||
defaultClientName = String.valueOf(context.getThisTaskId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool~
@vongosling Thanks for your comments. And I will rebase the code on master. |
Hi @hustfxj I guess you know Apache RocketMQ a lot, please take a look if you have time. |
eff76dc
to
8182480
Compare
POM files updated & Rebased. |
} else { | ||
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vesense In fact I don't suggest that per task has a consumer by setting the instanceId, thus per worker maybe have lots of consumer. Why not consider that per worker only has a consumer by singleton pattern.
if (process(msgs)) { | ||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; | ||
} else { | ||
return ConsumeConcurrentlyStatus.RECONSUME_LATER; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The messages maybe lost because the consumer is based the automatic commit mode of RocketMq here. It had better not commit the message until the storm handle the message successfully. Of course, this is only a consumption strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, only push mode is supported which is auto-commit. We can add pull mode in the next stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hustfxj Thanks for your reminding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vesense The push mode can also send messages with at-least-once. You can commit the message at "ack(Object msgId)" , which means that spout don't commit the message until the storm handle the message successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hustfxj I updated the code. Now all consumed messages will be put into queue and managed by MessageRetryManager
. Of course, this is in storm side to implement the "at-least-once".
BTW, I'm not sure if I misunderstand the ConsumeConcurrentlyStatus
:
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
if (process(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
When returning CONSUME_SUCCESS, the consumer will commit the offset automatically. And when returning RECONSUME_LATER, the consumer will consume the failed messages again? Do you mind explaining what the consumer will do after receiving the ConsumeConcurrentlyStatus.RECONSUME_LATER
?
Please correct me if I misunderstand. Thanks.
@vongosling @hustfxj Updated. Can you take a look again? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall LGTM. Added few questions
|
||
// Since RocketMQ Consumer is thread-safe, RocketMQSpout uses a single | ||
// consumer instance across threads to improve the performance. | ||
synchronized (RocketMQSpout.class) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even if its thread-safe shouldn't we consider making per spout instance its own consumer. That way it will more performant instead of one consumer making a call to the rocketmq-servers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe my code comment is not so clear. thread-safe is just precondition, the important is that this is related to the RocketMQ internal implementation(sharing queue, threads, etc.), "Consumer concurrency / Only one consumer instance per process" is the way official recommend.
RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer); | ||
|
||
if (ordered) { | ||
consumer.registerMessageListener(new MessageListenerOrderly() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a push model from server instead of spout polling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, the RocketMQ "push" mode is still pulling data from broker. PushConsumer is a high level consumer API, wrapping the pulling details, looks like broker push messages to consumer.
if (msgs.isEmpty()) { | ||
return true; | ||
} | ||
MessageSet messageSet = new MessageSet(msgs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any plans of offering a TupleMapper to flatten the schema or you want to preserve the same data in rocketmq and send it downstream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the work is in progress. I will update the PR later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
@vongosling @hustfxj @harshach Updated. Can you take a look again? |
@vesense thank you. +1 |
LGTM @vesense . +1 |
Great ~ |
https://issues.apache.org/jira/browse/STORM-2349
This is the init version for code review.
Current phase(fetatures included in this PR):
Local tests passed.
Next phase(in the plan):
RocketMQ Trident Spout.