Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-2340: fix AutoCommitMode issue in KafkaSpout #1919

Closed
wants to merge 6 commits into from

Conversation

mingmxu
Copy link

@mingmxu mingmxu commented Feb 3, 2017

STORM-2340: fix AutoCommitMode issue in KafkaSpout

@hmcl
Copy link
Contributor

hmcl commented Feb 6, 2017

@xumingmin does this PR make the initial PR obsolete? If so, can you please close that one? If not, I suggest that we have only one PR (compatible with STORM-2225) to address this issue.

@mingmxu
Copy link
Author

mingmxu commented Feb 6, 2017

closed #1863.
#1919 is compatible with STORM-2225.

@revans2
Copy link
Contributor

revans2 commented Feb 6, 2017

@xumingmin Could you also include a STORM JIRA number in the description of this JIRA? That is how Apache is able to tie the pull request to a corresponding JIRA.

@mingmxu mingmxu changed the title fix: KafkaSpout is blocked in AutoCommitMode STORM-2340: fix At-Most-Once issue in KafkaSpout Feb 6, 2017
@mingmxu
Copy link
Author

mingmxu commented Feb 6, 2017

@revans2 updated. thanks for the notes!

Copy link
Contributor

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general it looks good, but I would like to hear your opinion about replay in auto commit mode. If we don't want to support replay in that mode then we need a documentation change to indicate that. Auto commit already makes it so replay might not work correctly on a crash. This change would just makes it even less likely that a message will be replayed.

emitted.add(msgId);
numUncommittedOffsets++;
if(!consumerAutoCommitMode){//only need to track in none-AutoCommitMode
emitted.add(msgId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still want to track emitted even in auto commit mode. Replaying a failed tuple will not work unless it is added to emitted and even with auto commit some people may want to replay tuples.

numUncommittedOffsets is only ever decremented in auto commit mode so that part of the change is fine. Although, it would be good to add a javadoc to numUncommittedOffsets indicating that it is not modified in auto commit mode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to check if there are any ackers before adding the tuple to emitted. Otherwise people who are using auto commit and no ackers will run into OOME when emitted gets sufficiently large.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not needed. If there are no ackers or if acking is disabled for some other reason ack is called as part of the call to emit.

https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java#L142

Copy link
Contributor

@srdo srdo Feb 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, nevermind. Just occured to me that Storm will call ack immediately if it's configured that way :)

edit: Concurrent editing ^^

@mingmxu
Copy link
Author

mingmxu commented Feb 7, 2017

Good point to document the impact, is preparing an update PR.

According to my understand, there's no replay for AT-MOST-ONCE mode in storm. We use this mode when data missing is not a concern, and want to remove the overhead of tuple tracking.

With auto-commit enabled, the consumer's offset will be periodically committed in the background by Kafka consumer, so no need to track commit offset status with emitted/numUncommittedOffsets manually.

@srdo
Copy link
Contributor

srdo commented Feb 7, 2017

There are two different settings influencing retry behavior here. Auto commit on the spout, and number of ackers configured for Storm (https://storm.apache.org/releases/1.0.0/javadocs/org/apache/storm/Config.html#TOPOLOGY_ACKER_EXECUTORS). You can configure Storm to require tuple acking, while also using auto commit for this spout, in which case the spout will retry tuples that fail, but only on a best-effort basis (i.e. if the spout crashes, tuples will not be replayed). In that case, the spout actually needs to track tuples even though auto commit is on. You can also set acker executors to 0, which will cause Storm to just ack tuples immediately when emitted. In that case it doesn't hurt anything if the spout adds tuples to emitted, because they'll just get removed again when ack() is called.

@revans2
Copy link
Contributor

revans2 commented Feb 7, 2017

This is the thing with auto-commit. auto-commit removes some of the overhead of tracking outstanding messages, but also violates a lot of the guarantees that storm has in place.

To truly get at most once processing you need to commit the message as soon as it is emitted. Otherwise a crash can still result in messages being replayed. If you want at least once processing you have to ack the message only after it is fully processed. Storm makes this work out of the box for most spouts that commit the message when ack is called.

With auto-commit it makes it so we could replay messages even in at most once processing and that we might not replay messages in at least once processing. It is "I really don't care what messages I see processing".

@@ -107,7 +107,8 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect

// Offset management
firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
// with AutoCommitMode, topology works in 'At-Most-Once' mode, and offset will be periodically committed in the background by Kafka consumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this to the readme https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md

We don't document auto-commit there, but if people are using it we should have something explaining how to use it and the ramifications of turning it on.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, let me update it

@mingmxu
Copy link
Author

mingmxu commented Feb 7, 2017

it makes sense for me to totally rely on kafka autocommit in at-most-once semantic, and leverages manually offset management for at-least-once, not mix the two offset options.

@srdo
Copy link
Contributor

srdo commented Feb 7, 2017

The spout as is doesn't support true at-most-once. When auto commit is on, the consumer periodically commits offsets. When auto commit is off and ackers is 0, the spout periodically commits offsets manually. In either case the spout could crash and replay tuples that were acked but not committed. If you need real at-most-once, the spout needs to commit offsets immediately when acked, and in that case you would also need to disable auto commit.

@mingmxu
Copy link
Author

mingmxu commented Feb 7, 2017

The situations in my mind:

  1. autocommit=true + ack=0, --> work, at-most-once
  2. autocommit=true + ack>0 --> work, at-most-once ( acker be idle ? )
  3. autocommit=fale + ack=0 --> not work, blocked when numUncommittedOffsets < maxUncommittedOffsets;
  4. autocommit=false + ack>0. --> work, at-least-once

@srdo
Copy link
Contributor

srdo commented Feb 7, 2017

Autocommit=true + ack=0 means the consumer will commit periodically. Tuples cannot fail. Messages may be replayed if the spout crashes (it may not have committed them because it only happens periodically). This is not true at-most-once, since it is only at-most-once if the spout doesn't crash.

Autocommit=true + ack>0. Acked tuples are handled identically to the case above. Failed tuples are replayed, but only if the spout doesn't crash. This configuration could make sense if you just want best-effort replaying, or you're running a topology with other types of spouts as well.

Autocommit=false + ack=0 will cause Storm to ack tuples immediately. The spout will commit them periodically (https://github.com/XuMingmin/storm/blob/717edc8e7ea46ef3392ec1402235a764039070ff/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L241). This is identical behavior to the first case, except the commit handling has moved from the consumer to the spout.

Autocommit=false + ack>0 is at-least-once.

@mingmxu
Copy link
Author

mingmxu commented Feb 7, 2017

when ack=0, explicit ack()/fail() should not take affects as no AckExecutor is running.

Regarding to spout crash, possiblely some tuples are replayed in a periodically commit manner, unless offset is committed after every emit, that's a trade-off for performance.

Another point, may need to skip messageId in SpoutOutputCollector.emit() when auto-commit, to disable ack permanently:

public List emit(List tuple) Emits a tuple to the default output stream with a null message id. Storm will not track this message so ack and fail will never be called for this tuple. The emitted values must be immutable.

@srdo
Copy link
Contributor

srdo commented Feb 7, 2017

Please see #1919 (comment) and https://storm.apache.org/releases/1.0.0/javadocs/org/apache/storm/Config.html#TOPOLOGY_ACKER_EXECUTORS. If ack=0 Storm will call ack immediately on emit.

Yes, some tuples may be replayed if the spout crashes. What I'm saying is you can't currently configure the spout to commit after every ack. So it's not really at-most-once, so we shouldn't call it that. It's confusing.

Emitting with null message id is an option, but then you have to make sure that emitted tuples leave no state behind (i.e. they aren't put in emitted). It also prevents you from getting best-effort replaying unless you explicitly check for ackers in emitIfNotEmitted.

@mingmxu
Copy link
Author

mingmxu commented Feb 7, 2017

remove at-most-once for better describe the changes;
emit null msgId when AutoCommitMode to disable tuple track in storm;

@mingmxu
Copy link
Author

mingmxu commented Feb 10, 2017

@srdo @revans2, any comments ?

@srdo
Copy link
Contributor

srdo commented Feb 10, 2017

LGTM. It doesn't support running with auto commit mode and getting best-effort retry, but I don't know if that's a real use case, so maybe it doesn't matter.

Copy link
Contributor

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I have a few minor nits, but they are very minor.

I am a bit conflicted about not emitting with any anchor at all. Acking works really well for flow control, not just at least once processing. But we have backpressure now so that use case is less relevant. If someone really wants flow control on the kafka spout without backpressure and best effort then we can file a separate JIRA.

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
.builder(String bootstrapServers, String ... topics)
.setProp(props)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: set prop works with a key and a value, so it would be smaller to just do

KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
		.builder(String bootstrapServers, String ... topics)
		.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
                .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
		.build();

retryService.remove(msgId);

if(consumerAutoCommitMode){
if (tuple instanceof KafkaTuple) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation

collector.emit(((KafkaTuple)tuple).getStream(), tuple);
} else {
collector.emit(tuple);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put in a comment saying something like when autocommit is enabled don't bother to track anything.

@mingmxu mingmxu changed the title STORM-2340: fix At-Most-Once issue in KafkaSpout STORM-2340: fix AutoCommitMode issue in KafkaSpout Feb 10, 2017
@mingmxu
Copy link
Author

mingmxu commented Feb 10, 2017

thanks @revans2 , updated doc as advised.

Regarding to anchor, as ack() is called immediately when acker=0, though it's not needed to track.

Originally, this ticket is focused to solve case 1. let's have a separated one to address case 2 and case 3?
1). autocommit=true + ack=0
2). autocommit=true + ack>0 //ack track is running idle, to-be-confirm
3). autocommit=fale + ack=0 //ack immediately, similar as case 1, to-be-confirm
4). autocommit=false + ack>0

@HeartSaVioR
Copy link
Contributor

Case 3 is normally what we think about non-ack in Storm spout, and it's not same as case 1 since it can guarantee at most once.

So one way to keep our delivery guarantee is forcing autocommit to true (not selectable from user), which I'm not sure we're all satisfied. What do you think?

@mingmxu
Copy link
Author

mingmxu commented Feb 13, 2017

I would prefer to keep case 1 for at-most-once, when user want to avoid manage the offset additional in KafkaSpout.

IMO, there's no difference for guarantee between case 3 and case 1. It's achieved either committed by KafkaSpout or by KafkaConsumer. Back to the discussion before, I think case 1 can also guarantee after reading code of KafkaSpout as below:
*it's assumed that KafkaSpout.close() is called in any crash, otherwise no for both *

  1. close() call shutdown;
  2. when consumerAutoCommitMode=false, commitOffsetsForAckedTuples() is called to commit offset;
  3. then kafkaConsumer.close() in the finally block would commit the latest offset if consumerAutoCommitMode=true, as
KafkaConsuer.close() 
 -> ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
   -> ConsumerCoordinator.close()
     -> ConsumerCoordinator.maybeAutoCommitOffsetsSync();
       -> commitOffsetsSync(subscriptions.allConsumed());

@srdo
Copy link
Contributor

srdo commented Feb 13, 2017

@xumingmin You shouldn't assume that KafkaConsumer.close is called when crashing. If the executor running the spout crashes (or throws an uncaught Exception), close is not called as far as I know. If the spout is reassigned to another worker, Storm will just kill -9 the JVM running the spout. I'm not sure ISpout.close is actually ever called in a non-local cluster. See https://storm.apache.org/releases/0.9.7/javadocs/backtype/storm/spout/ISpout.html#close--

If you need real at-most-once, the spout would probably need to commit offsets after every ack.

@mingmxu
Copy link
Author

mingmxu commented Feb 13, 2017

@srdo if kafkaConsumer.close() is not guaranteed to call, strictly, there's no at-most-once guarantee for either case 3 or case 1 so far. And case 1 equals case 3 after this PR.

@srdo
Copy link
Contributor

srdo commented Feb 13, 2017

Yes, that's what I mean. It's not really at-most-once. For real at-most-once, the spout should call KafkaConsumer.commitSync when it receives an ack, not periodically (and then you'd need to disable auto-commit).

@mingmxu
Copy link
Author

mingmxu commented Feb 13, 2017

@srdo agree, that's why I remove the word at-most-once in this conversation to make it clear.

I haven't followed the latest Storm version for some time, I remember no later than 0.9.* people call it at-most-once when acker=0.

@HeartSaVioR
Copy link
Contributor

@srdo @xumingmin
If my understanding is right, at-most-once can be guaranteed with this step:

  1. pull the data from datasource
  2. send ack to the datasource
  3. emit the data to the downstreams

Loosening the requirement that there will be no crash between emitting the data and sending ack to the datasource, we can swap 2 and 3, and that's what we're often referring to.

So yes case 3 should explicitly ack to the datasource and data should be emitted only when sending ack succeeds. I'm not familiar with Kafka new API, but if KafkaConsumer.commitSync guarantees ack, we should use this for case 3.

Please correct me if I'm missing here.

@srdo
Copy link
Contributor

srdo commented Feb 13, 2017

@HeartSaVioR That's pretty much my understanding of this as well. I'm not sure what order the ack/emit happens in when ackers=0, but if it's ack followed by emit, then it should be enough to put a KafkaConsumer.commitSync call in KafkaSpout.ack. commitSync guarantees that the committed offset is properly committed when it returns.

I agree that this is how it should work for case 3.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 13, 2017

@srdo
I think it's up to the Spout implementation and we don't need to consider ordering if we don't rely on ack() method.

If the spout is aware of the count of ackers (0 or more) and data source provides the way to ack synchronously, nextTuple() method can have two branches (ack vs non-ack) which handle ack/emit accordingly, and ack() method can don't do anything when no acker is activated.
(I guess it might make the code a bit redundant or even dirty though)

@mingmxu
Copy link
Author

mingmxu commented Feb 13, 2017

Do we really need a commitSync call for every emit? That would be lots of calls. The consumer reads data in batch.
The pseudo-at-most-once, either case1 or case 3, is good option, as a performance trade-off .

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 13, 2017

What I'm referring is the blog doc: https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client/

Referring to the doc, "at most once" can be achieved with below code snippet with no autocommit enabled:

try {
  while (running) {
  ConsumerRecords<String, String> records = consumer.poll(1000);

  try {
    consumer.commitSync();
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  consumer.close();
}

It calls commitSync for every poll, not every emit (println in the code).
Again I'm not familiar with new Kafka API so if we think the above code snippet has some other issues I'm completely OK to not providing strict at-most-once.

@mingmxu
Copy link
Author

mingmxu commented Feb 14, 2017

@HeartSaVioR instinctively I think your code can meet at-most-once. Records that are pulled and not yet emitted are discarded if any crash.

Still, I'd want to limit the scope of this task, to close it first to fix case 1, and have a new task(STORM-2357) to figure out what's the proper solution to achieve exactly at-most-once.

@HeartSaVioR
Copy link
Contributor

@xumingmin
OK agree that it's beyond the PR's scope. We still need to sort out case 1 to 4, but this PR itself looks great to me.
+1

@asfgit asfgit closed this in 914a476 Feb 14, 2017
asfgit pushed a commit that referenced this pull request Feb 14, 2017
* Closes #1919
* fix: KafkaSpout is blocked in AutoCommitMode
* add comments for impacts of AutoCommitMode
* add doc about how to use KafkaSpout with at-most-once.
* remove at-most-once for better describe the changes; emit null msgId when AutoCommitMode;
* update sample code in storm-kafka-client to use inline setProp()
@srdo
Copy link
Contributor

srdo commented Feb 14, 2017

@HeartSaVioR Right, acking a batch at a time is better.

ptgoetz pushed a commit to ptgoetz/storm that referenced this pull request Jul 11, 2017
* Closes apache#1919
* fix: KafkaSpout is blocked in AutoCommitMode
* add comments for impacts of AutoCommitMode
* add doc about how to use KafkaSpout with at-most-once.
* remove at-most-once for better describe the changes; emit null msgId when AutoCommitMode;
* update sample code in storm-kafka-client to use inline setProp()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants