Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4473: RecordCollector should handle retriable exceptions more strictly #2249

Closed
wants to merge 2 commits into from

Conversation

dguy
Copy link
Contributor

@dguy dguy commented Dec 13, 2016

The RecordCollectorImpl currently drops messages on the floor if an exception is non-null in the producer callback. This will result in message loss and violates at-least-once processing.
Rather than just log an error in the callback, save the exception in a field. On subsequent calls to send, flush, close, first check for the existence of an exception and throw a StreamsException if it is non-null. Also, in the callback, if an exception has already occurred, the offsets map should not be updated.

@dguy
Copy link
Contributor Author

dguy commented Dec 13, 2016

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/97/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/98/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/99/
Test PASSed (JDK 8 and Scala 2.11).

TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
offsets.put(tp, metadata.offset());
} else {
sendException = exception;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we cannot just throw a StreamsException directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback happens on the IO thread of the producer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

@Override
public void flush() {
log.debug("{} Flushing producer", logPrefix);
this.producer.flush();
checkForException();
Copy link
Member

@mjsax mjsax Dec 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check first and than flush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did have it checking before and after flush, but decided to flush and then check. The reason being that we probably want to close the producer, that will also causes the messages to be flushed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for clarification. If we call flush, we now that there are no "dangling callbacks" for onCompletion() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep - the have all completed by the time flush returns

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor comment otherwise LGTM. I have a couple of other general comments which could be tackled in a separate PR if you agree with my comments.

@@ -79,9 +81,17 @@ public RecordCollectorImpl(Producer<byte[], byte[]> producer, String streamTaskI
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
if (sendException != null) {
log.warn("{} not updating offset for topic {} partition {} due to previous exception",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a batch of records, if the first record failed, it will cause all the rest record's callback to add a warn entry and hence swamp the log file. I feel it is better to just modify line 95 to sth. like "error sending to topic and partition, will not updating offset of this partition anymore and this exception should be eventually thrown to the user".

@@ -110,6 +127,7 @@ public void flush() {
@Override
public void close() {
producer.close();
checkForException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not introduced in this PR, but I realized this function is never called actually. Is it a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yes it is only called from the test i wrote. The producer is closed during shutdown in StreamThread. Which i don't think would matter on its own, but it is semi-related to your comment above. During shutdown and suspend we should only commit offsets after we've flushed and/or closed the producer - otherwise we run the risk of violating at-least-once.

@Override
public void flush() {
log.debug("{} Flushing producer", logPrefix);
this.producer.flush();
checkForException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not introduced in this PR and we can tackle it in a separate PR if my reasoning is correct: in #1970 we did this ordering:

1. Commit tasks BUT only commit their consumers, do not flush states or flush producers.
2. Close tasks.
3. Flush stores: stateMgr.flush(processorContext).
4. Flush producers.
5. Close state manager: closeAllStateManagers().
6. Close producer / consumer / restore consumer.

I do not remember is there is any reasons to do step 1 first, but it definitely void the at-least-once guarantees if we have a failure after step 1 before any of other steps right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. I've raised: https://issues.apache.org/jira/browse/KAFKA-4561

@dguy
Copy link
Contributor Author

dguy commented Dec 20, 2016

@guozhangwang updated comment based on your feedback - thanks. I also raised another JIRA to track the other issue

@asfbot
Copy link

asfbot commented Dec 20, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/261/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Dec 20, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/259/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Dec 20, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/260/
Test PASSed (JDK 8 and Scala 2.12).

@asfgit asfgit closed this in 0321bf5 Dec 20, 2016
@guozhangwang
Copy link
Contributor

Merged to trunk.

@dguy dguy deleted the kafka-4473 branch January 13, 2017 08:39
soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
…trictly

The `RecordCollectorImpl` currently drops messages on the floor if an exception is non-null in the producer callback. This will result in message loss and violates at-least-once processing.
Rather than just log an error in the callback, save the exception in a field. On subsequent calls to `send`, `flush`, `close`, first check for the existence of an exception and throw a `StreamsException` if it is non-null. Also, in the callback, if an exception has already occurred, the `offsets` map should not be updated.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes apache#2249 from dguy/kafka-4473
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants