Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-2087: storm-kafka-client - tuples not always being replayed #1679

Merged
merged 1 commit into from
Nov 19, 2016

Conversation

jfenc91
Copy link
Contributor

@jfenc91 jfenc91 commented Sep 11, 2016

I am having issues with topologies that fail tuples. The kafka offsets seemed to get stuck and the kafka spout eventually halted even though the last committed offset was no where near the end of the topic.

Here are a few unit tests that I believe replicate my situation. The last 2 are currently failing which is reflective of what I am seeing in my topologies. Let me know if I missed anything! This seems like a pretty big oversight, so I am getting the feeling that something in the test is wrong. Thanks!

I added tests for the following cases:

  • All tuples being acked.
  • A tuple being failed in order it was emitted
  • A tuple being failed out of the order it was emitted

Update: Also Provided a fix now as well.

@jfenc91 jfenc91 changed the title storm-kafka-client tests: tuples not being properly replayed STORM-2087: storm-kafka-client tests - tuples not being properly replayed Sep 11, 2016
@jfenc91 jfenc91 changed the title STORM-2087: storm-kafka-client tests - tuples not being properly replayed STORM-2087: storm-kafka-client - tuples not always being replayed Sep 11, 2016
@jfenc91
Copy link
Contributor Author

jfenc91 commented Sep 13, 2016

A larger refactor here is probably needed to make this more performant. These changes understandably seem to make the spout struggle to get the processing tuple count anywhere near the max spout pending.

}
}
}

// ======== emit =========
private void emit() {
emitTupleIfNotEmitted(waitingToEmit.next());
//Keep trying to send a tuple when requested
while(!emitTupleIfNotEmitted(waitingToEmit.next()) && waitingToEmit.hasNext())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I don't think it hurts, I don't see why this is necessary. Storm should call nextTuple repeatedly until something is emitted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was having issues with the spout reaching the max spout pending limit. Although it is apparent now that there are other contributing factors. So I will go back to the simpler way here. No need to overcomplicate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry to flip-flop on this, but after our discussion on STORM-2106, I think it makes sense to have this loop. Storm will by default pause for 1ms between each call to nextTuple if nothing is emitted. This could end up being a lot of pausing if there's a failed tuple far behind the latest acked offset. Having this loop prevents that pause from happening unless there really isn't anything to emit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a problem. Performance/throughput is very important to me so I am happy to go back to this!

@@ -266,26 +266,32 @@ private void doSeekRetriableTopicPartitions() {
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last committed offset
kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset
Copy link
Contributor

@srdo srdo Sep 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Is this code being hit by the tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup :) . shouldReplayInOrderFailedMessages and shouldReplayOutOfOrderFailedMessages Tests.

Copy link
Contributor

@hmcl hmcl Sep 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jfenc91 Why is seekToEnd not correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guessing here, but if no tuples have been acked since partition reassignment and some have failed, this case should be hit. The right thing to do is then to start replaying from the last committed offset. If the consumer just skips to the end of the partition, it may skip some messages, and findNextCommitOffset will break on subsequent calls because the offsets that can be committed aren't continuous anymore.

@jfenc91 The tests you mention ack a message before a tuple is failed. Doesn't that prevent the spout from hitting this case, since there's now an offset in acked?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. This probably isn't great if auto commit mode is enabled. It'll make the spout return to where it started processing the partition every time a tuple is failed. It might make sense to disable tuple retries when auto commit is enabled, I don't think we can support it properly in auto commit mode with the current RetryService interface, since we don't update committedOffset in auto commit mode.

I think having fail() do nothing if auto commit is enabled makes sense. Otherwise, RetryService needs to return the list of offsets that should be retried so this code can seek directly to the smallest retry offset per partition instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Autocommit mode comes with no guarantees of delivery. It is supported for backwards compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jfenc91 About this being tested: If the two tests you mentioned run fine without this change, would you mind adding one that tests this? Maybe something like have a topic with a few messages in it and fail the first message, then check that the spout doesn't skip the messages following the failed message?

Copy link
Contributor Author

@jfenc91 jfenc91 Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So shouldReplayOutOfOrderFailedMessages does fail the first tuple and with changing the line that test does fail. You are right, the other passes. So, I am renaming the test that checks this case to: shouldReplayFirstTupleFailedOutOfOrder to make it more clear. I would add another, but I hate to duplicate unit tests without adding extra value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jfenc91 @srdo this is correct. @jfenc91 good finding!

break;
//Received a redundant ack. Ignore and continue processing.
LOG.debug("topic-partition [{}] has unexpected offset [{}]. Found with message on offset [{}]. Current committed Offset [{}]",
tp, currOffset, currAckedMsg.offset(), committedOffset);
Copy link
Contributor

@srdo srdo Sep 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't currOffset and currAckedMsg.offset the same here, due to the assignment in L490?

found = true;
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
} else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search
LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
break;
} else {
LOG.debug("topic-partition [{}] has unexpected offset [{}].", tp, currOffset);
break;
//Received a redundant ack. Ignore and continue processing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to correct me, but it's my impression that Storm doesn't really support double acking. If a bolt acks the same tuple multiple times, I think Storm ends up failing the tuple tree. Have you seen double acking occur from Storm's side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I added this because I was seeing acks on tuples that were behind the already committed offset. With that break statement in place, the result is a complete halt in processing. While this isn't pretty this is the only solution I could see.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a test case where we can reproduce this consistently? This code is already running in a large production environment, and according to the feedback I received, there are no issues of this nature.

I am also a bit confused on what I mean by multiple acks. I am pretty sure storm guarantees that a tuple it's either acked (once), failed (once), or times out (which is equivalent to failing)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still seems strange to me if Storm acks a message multiple times. You're right though that the break statement should be removed, if the spout gets into a state where an offset <= committed is acked, the offset should probably just be logged and dropped since it's already committed. I'd still like to know how the spout gets into this state though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this a couple of times before I figured it out. I have not taken the time to reproduce this in a toy/test case, but given the error message this is clearly a storm or storm-kafka-client issue. I got to this state in about 30 minutes of running a topology processing 800k-300k tuples a minute with about 10s latency. The input to the topology was on the order of 2k-10k tuples per minute with a bolt that separated each input into multiple tuples. At startup there was a high amount of failures after the separation (I was making requests against an unwarmed ELB). I would guess that that is enough to reproduce with random data/failures.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jfenc91 @srdo This behavior still seems quite surprising to me. However, this fix probably won't have any bad side effect, so it should be OK. Perhaps you can add to the log message that we are ignoring it as it is out of order, duplicate, or something like that.

@revans2 can you please comment on why, or if it is possible a tuple to be acked more than once?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is not with storm itself, but with the internal bookkeeping of the spout. If Kafka rebalances partitions (or there are other recoverable errors) the spout resets all of it's internal bookkeeping, but there is no way to un-emit something. So everything that was emitted, but not fully acked/committed when the recovery happened is still outstanding, and will come back for the spout to process.

  1. emit A offset B
  2. oh crap, recovering...
  3. Recovered handling partition A again
  4. emit A offset B again
  5. ack A offset B (the first one)
  6. ack A offset B (the second recovered one)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can also happen if the Kafka cluster has unclean leader election enabled. In that case, the log end offset in Kafka may become smaller than the last offset the spout committed. It's a weird edge case, since most people probably don't need to have at least once in Storm while allowing message loss in Kafka. Bit of a newbie trap though, since unclean leader election is enabled by default in Kafka.

});
}

private void assertOffset(int offset, KafkaSpout.OffsetEntry entry) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming to something more descriptive, like assertOffsetCommitted or something like that. It isn't really obvious from the name what this does


IntStream.range(0, msgCount).forEach(value -> {
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
topicName, ((Integer)value).toString(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Integer cast seems unnecessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value is an int. So I changed the Integer casts to Integer.toString(value) .... that probably looks a bit nicer

spout.nextTuple();
});

spout.ack(messageIdToDoubleAck.getValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really sure the spout needs to support double acking.

Thread.sleep(200);

//allow for some failures with +5 instead of -1
IntStream.range(0, messageCount + 5).forEach(value -> {
Copy link
Contributor

@srdo srdo Sep 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of failure can happen here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A call to the spout's emit() does not always mean a tuple is emitted. I will improve the comment.

@jfenc91
Copy link
Contributor Author

jfenc91 commented Sep 20, 2016

Thanks for the review here @srdo! I will have a few more PRs headed your way in a week or two for this kafka client to make it fully usable.

@jfenc91 jfenc91 force-pushed the stormKafkaClientTests branch 9 times, most recently from ba46cf0 to 465ef07 Compare September 20, 2016 06:35
@@ -266,26 +266,32 @@ private void doSeekRetriableTopicPartitions() {
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last committed offset
kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset
Copy link
Contributor

@hmcl hmcl Sep 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jfenc91 Why is seekToEnd not correct?

// emits one tuple per record
/**
* Emits one tuple per record
* @return True if tuple was emitted else false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method returns void. What does the @return True comment mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is left over from a revert.

private final TopicPartition tp;
private final long initialFetchOffset; /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
* Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
private long committedOffset; // last offset committed to Kafka. Initially it is set to fetchOffset - 1
long committedOffset; // last offset committed to Kafka. Initially it is set to fetchOffset - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It violates encapsulation to make this field non private.

Is this package protected for test purposes? If so, wouldn't it be better to use the testing tool reflection based methods to test for internal state, rather than exposing the state to the outside.

Copy link
Contributor Author

@jfenc91 jfenc91 Sep 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is a way I don't know about, using reflection is hard to follow and difficult to refactor making it somewhat fragile. I am adding a protected get method and changing this back to private to hopefully address your concerns.

@@ -479,16 +482,17 @@ public OffsetAndMetadata findNextCommitOffset() {
KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata

for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) { // found the next offset to commit
if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) { // found the next offset to commit
Copy link
Contributor

@hmcl hmcl Sep 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change is the code going to work for a topic that does not yet have any records and/or commits in it?

I recall testing the condition using initialFetchOffset was necessary. Why isn't it necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextCommitOffset starts out as initialFetchOffset - 1 due to L467 and L481. Assuming a tuple isn't acked after being committed, the two checks should work out to be the same thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srdo @jfenc91 you are right, this simplification is correct. Probably the code was refactored in a way that made this redundant. Good catch!

found = true;
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
} else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search
LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
break;
} else {
LOG.debug("topic-partition [{}] has unexpected offset [{}].", tp, currOffset);
break;
//Received a redundant ack. Ignore and continue processing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a test case where we can reproduce this consistently? This code is already running in a large production environment, and according to the feedback I received, there are no issues of this nature.

I am also a bit confused on what I mean by multiple acks. I am pretty sure storm guarantees that a tuple it's either acked (once), failed (once), or times out (which is equivalent to failing)

return config;
}

static public StormTopology getTopolgyKafkaSpout(int port) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Spelling error in method name

SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
Map conf = mock(Map.class);

KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), kafkaPort));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Remember to add generic types to the spout

@srdo
Copy link
Contributor

srdo commented Sep 20, 2016

@jfenc91 I borrowed a few classes from this PR for another change to the spout here #1696. I hope you don't mind :)

@jfenc91
Copy link
Contributor Author

jfenc91 commented Sep 20, 2016

Thats fine @srdo . Thanks again for the reviews. I also created STORM-2106 and STORM-2107 to keep track of the consequences of this change.

// emits one tuple per record
/**
* Emits one tuple per record
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a private method so it should be a standard line comment rather than a javadoc.

@jfenc91
Copy link
Contributor Author

jfenc91 commented Sep 21, 2016

@srdo Alright I think everything has been addressed here. I have actually been running this merged with your other PR for the last 12 hours processing 100M tuples and its looked pretty good. Only issue has been: "org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member." which is unrelated to this PR.

Thanks again for your help here!!

@srdo
Copy link
Contributor

srdo commented Sep 22, 2016

This more or less seems done to me. The only thing that bugs me is Storm double acking tuples. The only case I could think of is if tuples time out and they're later acked by the acker bolt, but it seems like the executor actually handles that. As far as I can tell hitting the changed case in findNextCommitOffset should actually indicate a bug in the code, because we've received an ack for a tuple we've already committed to Kafka, which shouldn't be possible unless a tuple is double acked. I don't think that should be happening unless the spout is doing something wrong?

@jfenc91 are you still seeing that case (L498) getting hit even with these changes and the other PR?

@hmcl
Copy link
Contributor

hmcl commented Sep 22, 2016

@srdo @jfenc91 I am on vacation this week (with limited access to Internet) and I will be back on Monday. Can we please holding on merging this until I can finish my review on Monday. I implemented the original patch, and would like to review these changes.Thanks.

I am still a bit confused about the duplicated acking tuples. If that is the case, that could be a possible storm guarantees problem, which should be fixed at its root cause, and not worked around in spout code. Furthermore, from what I understand, as far as the findNextOffset method behavior, the duplicate acking should be irrelevant, because once all the tuples up to a certain offset have been acked, they will be committed in the next commit call. For example let's say 1,2,3,4,5, 8,9,10 have acked, on the next commit call, 1,2,3,4,5 should be committed, but 8,9,10 won't be until 6,7 get acked. If 6,7 never get acked, say (retry forever scenario), then the desired behavior is that 8,9,10 never get committed. @jfenc91 Can you help me understand how does the duplicate acking interfere in this scenario, and how?

@srdo
Copy link
Contributor

srdo commented Sep 22, 2016

@hmcl I agree that if there really is a double acking problem somewhere else, it should be fixed there.

In your scenario say the spout commits 1...5 to Kafka and 4 is later acked. ackedMsgs now contains 4, and committedOffset is 5. With the previous code, that would permanently break findNextCommitOffset.

findNextCommitOffset would start at 5, and the first message in ackedMsgs list is 4. The code breaks out of the loop over ackedMsgs, since the else block in L496 is hit, causing the function to return null. When null is returned to commitOffsetsForAckedTuples, that means that it will skip that topic partition when committing, which means that ackedMsgs doesn't get cleared. That means that on the next call to findNextCommitOffset the same thing happens again. The end result is that the spout never commits anything for that topic partition again.

I agree that we should never be in the case where 4 is acked after committedOffset has become larger than 4, which is why I think the else block in L496 should probably log at a higher level than debug.

@jfenc91
Copy link
Contributor Author

jfenc91 commented Sep 24, 2016

@srdo @hmcl I tracked down one source of the "double acking"

It looks like I am being switched to reading from an out of sync replica. Looking at my offsets:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
bla_5_2 bla_5 102 237919 50979 -186940 consumer-7_/123.456.789.101

When examining with my other partition logs there is no way that this log-end-offset should be at 50k. It should be around 200k-300k. When the connector sends the request for offset 237920 it is out of range and the resets the offset to whatever the start offset is: https://github.com/apache/kafka/blob/0.10.0.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L580-L589 . For me this happened to be uncomitted_earliest so I got messages starting at position 1.

This is a really unfortunate situation especially since my "ACKS_CONFIG" was set to all. We should probably make an effort to handle it better so data loss / disruption is minimized. I kind of wonder if this ever happened with the kafka 8.

@srdo
Copy link
Contributor

srdo commented Oct 7, 2016

This PR LGTM at this point. I think we should raise a separate issue for better unclean leader election support, when that gets implemented the double acking test should probably be replaced with one that forces an unclean leader election between two test brokers.

final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);

if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) { // has been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we adding returns here. Having one return makes the method look cleaner. This is an if else conditions and we are returning false at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the return statements @harshach

@jfenc91
Copy link
Contributor Author

jfenc91 commented Oct 27, 2016

Hey, trying not to let this thread die. Could we get some help here from a committer? @HeartSaVioR @revans2 Thanks!!

@revans2
Copy link
Contributor

revans2 commented Oct 27, 2016

@srdo @hmcl have all of your concerns been addressed?

@srdo
Copy link
Contributor

srdo commented Oct 27, 2016

Yes. I think supporting unclean leader election in a nice way is a separate issue to this.

@revans2
Copy link
Contributor

revans2 commented Oct 27, 2016

I looked through the code and I didn't see anything that concerned me, and the travis test failure appears to be unrelated so I am +1 pending feedback form @hmcl

@harshach you also reviewed this do you have any concerns still?

@hmcl
Copy link
Contributor

hmcl commented Oct 27, 2016

@revans2 I will finish my review by lunch time PST

@revans2
Copy link
Contributor

revans2 commented Oct 27, 2016

@hmcl take your time. This is a critical piece of storm code and I really would like it right.

@revans2
Copy link
Contributor

revans2 commented Nov 3, 2016

@hmcl it has been a week, any hope of finishing the review?

@revans2
Copy link
Contributor

revans2 commented Nov 3, 2016

@knusbaum I noticed that you opened up #1753 and #1752. Would you mind taking a look at this and see if there are going to be any conflicts?

@hmcl
Copy link
Contributor

hmcl commented Nov 3, 2016

@revans2 apologies for the delay... started it and then something came in the way... will finish by tomorrow for sure. Will do my best to get this merged in asap...

@hmcl
Copy link
Contributor

hmcl commented Nov 7, 2016

@revans2 reviewing this at this moment.

Copy link
Contributor

@hmcl hmcl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main thing that I would suggest to be changed, unless there is a particular reason for it, is the while loop on L276.

The proposed offsets fixes seem correct to me. I am still a bit puzzled by the duplicate Acks, but the suggested fix seems to do no harm, so I am OK with it.

There is quite some code duplication in the test class that would be good to be addressed as well.

@jfenc91 Thanks for the fixes and apologies for the review taking a bit longer.

@@ -266,26 +266,32 @@ private void doSeekRetriableTopicPartitions() {
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last committed offset
kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Autocommit mode comes with no guarantees of delivery. It is supported for backwards compatibility.

@@ -479,16 +482,17 @@ public OffsetAndMetadata findNextCommitOffset() {
KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata

for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) { // found the next offset to commit
if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) { // found the next offset to commit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srdo @jfenc91 you are right, this simplification is correct. Probably the code was refactored in a way that made this redundant. Good catch!

import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;

public class SingleTopicKafkaSpoutConfiguration {
public static final String stream = "test_stream";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream is a constant. Can you please make it upper case.


public class SingleTopicKafkaSpoutConfiguration {
public static final String stream = "test_stream";
public static final String topic = "test";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topic is a constant. Can you please make it upper case.

found = true;
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
} else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search
LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
break;
} else {
LOG.debug("topic-partition [{}] has unexpected offset [{}].", tp, currOffset);
break;
//Received a redundant ack. Ignore and continue processing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jfenc91 @srdo This behavior still seems quite surprising to me. However, this fix probably won't have any bad side effect, so it should be OK. Perhaps you can add to the log message that we are ignoring it as it is out of order, duplicate, or something like that.

@revans2 can you please comment on why, or if it is possible a tuple to be acked more than once?

@@ -266,26 +266,32 @@ private void doSeekRetriableTopicPartitions() {
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last committed offset
kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jfenc91 @srdo this is correct. @jfenc91 good finding!

}
}
}

// ======== emit =========
private void emit() {
emitTupleIfNotEmitted(waitingToEmit.next());
waitingToEmit.remove();
while(!emitTupleIfNotEmitted(waitingToEmit.next()) && waitingToEmit.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jfenc91 I believe that there is no need for this while loop here. I ran all your test cases without it and they all pass. Unless there is a particular reason to have this loop in here, I would suggest that you remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a performance improvement. emitTupleIfNotEmitted does what it says. It may or may not emit a tuple. If it does not emit a tuple (because it has already been emitted) then the spout will exit nextTuple() having emitted nothing. In that case storm will sleep for 1 ms, even though waitingToEmit is not necessarily empty.

public static final String stream = "test_stream";
public static final String topic = "test";

static public Config getConfig() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: In all of the methods declared in this class, can you put the method modifiers in the order public static. That is the most standard way and recommended by the JLS. I also believe it makes the code a bit easier to read.

import static org.mockito.Mockito.*;
import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;

public class SingleTopicKafkaSpoutTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The blocks of code B1, B2, B3, B4 have pretty much the same initialization code, with exception of perhaps one or two values that can be parameterized. Can you please create an initialization or setup method with this code, and avoid all of the code duplication.

I didn't look in particular detail in other parts of this code, but it may also be easy to do the same thing for the multiple play and verify blocks of code in this class. If that's the case, that would be ideal.

Copy link
Contributor Author

@jfenc91 jfenc91 Nov 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I addressed the initialization part. I think that abstracting the different test cases away algorithmically would add too much complexity. It is nice to have test cases that are easy to understand.

@hmcl
Copy link
Contributor

hmcl commented Nov 8, 2016

+1 overall. It would be ideal to cleanup the test code according to my earlier suggestion. Thanks for the work @jfenc91

 - Added unit tests to storm-kafka-client
 - Fixed bug in kafka-spout-client that resulted in tuples not being  replayed
    when a failure occurred immediately after the last commit
 - Modified the kafka spout to continue processing and not halt upon receiving
    a double ack
@jfenc91
Copy link
Contributor Author

jfenc91 commented Nov 14, 2016

@hmcl thanks for the review. I believe I made all the requested changes here.

@hmcl
Copy link
Contributor

hmcl commented Nov 14, 2016

+1

@jfenc91 thanks for the extra cleanup.

@revans2 as far as I am concerned this patch is ready to merge. Thanks for your clarifying comments.

Copy link
Contributor

@harshach harshach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@asfgit asfgit merged commit 71f8e86 into apache:master Nov 19, 2016
@harshach
Copy link
Contributor

Thanks for the patch @jfenc91 and for being patient with reviews.
Merged into master. I would like to merge this into 1.x-branch as well. There are few issues on SingleTopicKafkaSpoutTest.java as its using Java8 syntax. Can you please open another PR for 1.x-branch.

@apiwoni
Copy link

apiwoni commented Oct 19, 2017

We are on Apache Store 1.0.4 and I''m trying to convince people to upgrade because I suspect we are experiencing this bug.
Here's what I see based on nextTuple cycle:

LOG: Topic partitions with entries ready to be retried [[TOPIC-0]]
LOG: topic-partition [TOPIC-0] has non-continuous offset [219355]. It will be processed in a subsequent batch.
org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd[ where current committed offset in Kafka = 219351]
Polled [0] records from Kafka. [6253] uncommitted offsets across all topic partitions

I would expect org.apache.storm.kafka.spout.KafkaSpout#pollKafkaBroker return few records after seekToEnd?

@srdo
Copy link
Contributor

srdo commented Oct 19, 2017

@apiwoni You may be able to upgrade just the spout if you don't want to do a full cluster upgrade. I think the core APIs haven't changed since 1.0.0.

Are you using topic compaction in Kafka by any chance?

@apiwoni
Copy link

apiwoni commented Oct 20, 2017

@srdo We are not using topic compaction.

@srdo
Copy link
Contributor

srdo commented Oct 21, 2017

@apiwoni Okay, just wanted to know because 1.0.x doesn't handle topic compaction very well, and I thought it might be related.

Your log looks a lot like this issue. I'm not sure why you expect the poll to return records after seekToEnd? seekToEnd seeks to the end of the log in Kafka, and it doesn't actually do the seek until poll or position is called on the consumer. The result is that the consumer's position is at the last offset in the log, which means it won't emit anything until new messages are added to Kafka.

Due to this issue it will probably cause the spout to stop emitting entirely, since the failed offsets keep triggering the seekToEnd, which makes the spout skip past all new messages every time it polls.

I verified this behavior with a small example application with a single consumer that just calls seekToEnd and poll in a loop. It never emits any messages.

You should upgrade the spout. Whether you want to upgrade the rest of Storm is up to you, but be aware of https://issues.apache.org/jira/browse/STORM-2682 before deciding to jump to 1.1.1.

@apiwoni
Copy link

apiwoni commented Oct 23, 2017

@srdo I'm not sure why you expect the poll to return records after seekToEnd? seekToEnd seeks to the end of the log in Kafka, and it doesn't actually do the seek until poll or position is called on the consumer.

Based on Kafka spout loop and logs here's what I see and expect:

  1. org.apache.storm.kafka.spout.KafkaSpout#pollKafkaBroker
    1.1 org.apache.storm.kafka.spout.KafkaSpout#doSeekRetriableTopicPartitions
  • 1.1.1 org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff#retriableTopicPartitions
    // Log message indicates messageId is ready to be retried
    Topic partitions with entries ready to be retried [[TOPIC-0]]
  • 1.1.2 org.apache.storm.kafka.spout.KafkaSpout.OffsetEntry#findNextCommitOffset(TOPIC-0)
    // Returns null because of non-continuous offset
    topic-partition [TOPIC-0] has non-continuous offset [219355]. It will be processed in a subsequent batch.
  • 1.1.3 org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd(TOPIC-0)
    // This should seek to last committed offset for retriable partition, if exists, else to the last offset
  • 1.1.3.1 org.apache.kafka.clients.consumer.internals.SubscriptionState#needOffsetReset(OffsetResetStrategy.LATEST)
    1.2 org.apache.kafka.clients.consumer.KafkaConsumer#poll
    // I think poll should return records to be retried here because seekToEnd should go to last committed offset of 219351

Are you sure that after calling seekToEnd another call has been made to commit offset and subsequent call to seekToEnd did not got to last committed offset as comment in KafkaSpout states but to the last offset? I'm asking because seekToEnd only sets the offset to consume from and not the current committed offset.
seekToEnd calls org.apache.kafka.clients.consumer.internals.SubscriptionState#needOffsetReset(OffsetResetStrategy.LATEST) and my understanding of OffsetResetStrategy.LATEST is that if current offset for the consumer partition doesn't exist on the broker (in consumer_offset topic or Zookeeper) it should indeed be set to last offset, but if current offset already exists that is where seekToEnd should go.
I have verified this behavior using kafka-console-consumer.sh but I do not know if it calls seekToEnd which actually utilizes this strategy. I know this strategy bahavior is not explicitly spelled out in docs but that's how it should behave.

I think something has happend to maintenance of current offset by Kafka broker and seekToEnd resulted in default behavior for OffsetResetStrategy.LATEST which is seek to the last offset. This seem to work most of the time.

@srdo
Copy link
Contributor

srdo commented Oct 23, 2017

@apiwoni Your expectation for 1.1.3 is wrong. seekToEnd doesn't seek to the last committed offset, it seeks to the last offset. One of the fixes made in this PR is to make doSeekRetriablePartitions seek to the last committed offset instead of the end of the partition when findNextCommitOffset returns null.

Here's the null case code from 1.0.4 https://github.com/apache/storm/blob/v1.0.4/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L283. Note that the comment in that line is wrong, and it's actually seeking to the end of the partition.

Here's the code for the same case after this PR https://github.com/apache/storm/pull/1679/files#diff-7d7cbc8f5444fa7ada7962033fc31c5eR269. Note that it now matches the comment.

Your understanding of OffsetResetStrategy.LATEST is also slightly off. The OffsetResetStrategy describes the behavior of the KafkaConsumer when the consumer requests an offset that is out of range. Offsets are out of range if they are smaller than the beginning of the log (e.g. if parts of the partition were deleted), or if they are larger than the offset of the largest message in the partition. For example if you have a partition with offsets 10-500 present and the consumer tries to seek to offset 0, you will trigger the OffsetResetStrategy. The strategy has no effect on seekToEnd. seekToEnd always seeks to the last offset on the partition, barring some additions made in 0.11.0.0 that have to do with transactional writes.

I think what happened in your case is what I described above, you hit one of the bugs solved by this PR, and as a result your consumer jumped to the end of the partition, which caused it to emit no messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants