Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-2549: Fix broken enforcement mechanism for maxUncommittedOffset… #2156

Merged
merged 1 commit into from
Nov 10, 2017

Conversation

srdo
Copy link
Contributor

@srdo srdo commented Jun 10, 2017

…s in storm-kafka-client spout

Please see https://issues.apache.org/jira/browse/STORM-2549 and maybe also the comments on https://issues.apache.org/jira/browse/STORM-2343

This moves enforcement of maxUncommittedOffsets to the partition level. I don't believe there's a way to enforce the single limit globally without hitting a bunch of issues with it either being too strict regarding retries, or not being strict enough and allowing an unbounded number of new tuples along with the retries.

Iterator<Long> offsetIter = emittedOffsets.iterator();
for (int i = 0; i < index - 1; i++) {
offsetIter.next();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about calling toArray() on the set and then fetching the Nth offset directly using the array index? Trade-off speed for some memory/garbage. We can possibly even pre-allocate an array of size maxUncommitted + batch size for this purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is copying the entire set to an array faster than iterating the set?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess we ll need to check the implementation to confirm how the array is created from the data structures behind the Set interface. Agree that it is possible for the iteration to be as fast or faster than the array creation. There may not be a clear cut winner here and it might depend on 'N'.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like an iterator is used under the covers to create the array (which explains the ordering guarantee in the array). No change required.. :-)


when(consumerMock.poll(anyLong()))
.thenReturn(new ConsumerRecords(firstPollRecords))
.thenReturn(new ConsumerRecords(secondPollRecords));
.thenReturn(new ConsumerRecords(records));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: return generic ConsumerRecords to avoid warning around raw type

private List<ConsumerRecord<String, String>> createRecords(TopicPartition topic, long startingOffset, int numRecords) {
List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
recordsForPartition.add(new ConsumerRecord(topic.topic(), topic.partition(), startingOffset + i, "key", "value"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: return generic ConsumerRecords to avoid warning around raw type


//Check that the consumer started polling at the failed tuple offset
when(consumerMock.poll(anyLong()))
.thenReturn(new ConsumerRecords(Collections.singletonMap(partition, createRecords(partition, failedMessageId.get().offset(), 1))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return generic ConsumerRecords to avoid warning around raw type

In this case I am getting an incompatible bounds error on the V param to singletonMap()

@askprasanna
Copy link
Contributor

+1 for the changes.

Copy link
Contributor

@askprasanna askprasanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 LGTM

@hmcl
Copy link
Contributor

hmcl commented Jun 14, 2017

@srdo I am reviewing this. Thanks.

@hmcl
Copy link
Contributor

hmcl commented Jul 18, 2017

@srdo I am doing one last review of this patch.

@srdo
Copy link
Contributor Author

srdo commented Jul 19, 2017

Sorry, had to rebase to fix conflicts.

@srdo
Copy link
Contributor Author

srdo commented Aug 8, 2017

Rebased again to fix conflicts

@srdo
Copy link
Contributor Author

srdo commented Sep 2, 2017

Squashed, no changes

@WolfeeTJ
Copy link

Hi @srdo , I happened to see errors like this in my production after applied your PR:

2017-09-06 13:12:01.707 o.a.s.util Thread-15-spout-executor[103 103] [ERROR] Async loop died!
java.lang.IllegalStateException: No current assignment for partition kafka_bd_trigger_action-22
        at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:350) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1332) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:323) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:234) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__9708$fn__9723$fn__9754.invoke(executor.clj:646) ~[storm-core-1.1.1-SNAPSHOT.jar:1.1.1-SNAPSHOT]
        at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.1.1-SNAPSHOT.jar:1.1.1-SNAPSHOT]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
2017-09-06 13:12:01.709 o.a.s.d.executor Thread-15-spout-executor[103 103] [ERROR]
java.lang.IllegalStateException: No current assignment for partition kafka_bd_trigger_action-22
        at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:350) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1332) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:323) ~[stormjar.jar:?]

I think the issue might happen because it could be possible partition reassignment takes effect at kafkaConsumer.poll(), and as a result the pausedPartitions is different from the actual assignment. How do you think?

@srdo
Copy link
Contributor Author

srdo commented Sep 13, 2017

@WolfeeTJ Thanks for trying this out. I think you are right that it's because partition reassignment is happening at a bad time. I think we should move reassignment to be the first thing in nextTuple instead. Give me a little bit to fix this and maybe add a test.

I've been thinking a bit about partition revocation and shuffling, and we might want to add some warnings to the manual partition assignment API as well. There are a few cases I can think of where shuffling partitions can cause bugs. The Trident spout doesn't support partition shuffling because Trident doesn't expect partitions to move from task to task, as far as I can tell. When we implement at-most-once support for this spout there's also a requirement that partitions don't move between tasks, since otherwise it is possible that tuples are emitted more than once.

@srdo
Copy link
Contributor Author

srdo commented Sep 13, 2017

@WolfeeTJ I think it's fixed now, please take a look at the new commit.

@WolfeeTJ
Copy link

Hi @srdo , I'm not sure, so please correct me if I were wrong:

So far as I could see, I'm using the default NamedSubscription. And I checked the code, it's doing NOOP in org.apache.storm.kafka.spout.Subscription#refreshAssignment().
My understanding is, actual reassignment happens when using
final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
in org.apache.storm.kafka.spout.KafkaSpout#pollKafkaBroker().

So we might need to get the fresh partition assignment for kafkaConsumer.pause(pausedPartitions); somehow after a kafkaConsumer.poll(), because the previous pollablePartitions list might has been changed after kafkaConsumer.poll() ?

@srdo
Copy link
Contributor Author

srdo commented Sep 14, 2017

@WolfeeTJ We are dropping support for NamedSubscription and PatternSubscription in 2.0.0 and deprecating them in 1.2.0 because Kafka's mechanism for automatically assigning partitions to consumers is a bad fit for Storm. See https://issues.apache.org/jira/browse/STORM-2542 for details.

From 1.2.0 (1.x-branch) on, the spout will assign partitions using the https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java class. The default subscription in KafkaSpoutConfig will also change to use this subscription.

Partition assignment happens in refreshPartitions for the ManualPartitionSubscription, and poll won't trigger rebalance anymore, so we shouldn't need to worry about partitions changing when we call poll.

@WolfeeTJ
Copy link

@srdo great, thank you a lot for your info. This helps a lot. In this case I think it should be ok. I'll apply the commit and try it out.
Thanks a lot!

@srdo
Copy link
Contributor Author

srdo commented Sep 14, 2017

@WolfeeTJ No problem :)

Note that manual partition subscription doesn't work before version 1.1.2 https://issues.apache.org/jira/browse/STORM-2541, so you may want to check out 1.x-branch and apply this PR to it to get it working.

@WolfeeTJ
Copy link

Thank you @srdo for pointing this out :) 👍 Will do.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srdo
I'm sorry to review this really lately. Hopefully I got some hours to spend reading KafkaSpout, and the patch looks good based on my understanding.

In fact, regardless of understanding, IMO global counter is easy to be fragile, so happy to see we drop that and the counters are assigned to each partition.

+1

Could you craft patch against 1.x branch as well? 1.1.x branch as well if needed. Thanks in advance!

@srdo srdo mentioned this pull request Nov 10, 2017
@srdo
Copy link
Contributor Author

srdo commented Nov 10, 2017

@HeartSaVioR Thanks for the review.

The 1.x version is here #2407. I tried cherry picking to 1.1.x, but there have been a lot of changes in this module since then, and I don't think it's worth the effort to pull back if we're releasing 1.2.0 soon anyway. If someone really needs it I'll pull it back to 1.1.x, but otherwise I'll probably just move this and STORM-2546 to the 1.2.0 epic instead.

@asfgit asfgit merged commit cca93d2 into apache:master Nov 10, 2017
@HeartSaVioR
Copy link
Contributor

@srdo
Unfortunately I feel we couldn't bring Storm 1.1.2, and based on the fact, putting less effort to 1.1.x version line and focusing more on 1.2.0 may make sense to me. I guess it is not that trivial to port back indeed.
Let's revisit when someone asks us to port back. I'll take a look at #2407.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants