Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5429: Ignore produce response if batch was previously aborted #3300

Closed
wants to merge 3 commits into from

Conversation

hachikuji
Copy link
Contributor

No description provided.

@asfbot
Copy link

asfbot commented Jun 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5159/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5143/
Test PASSed (JDK 8 and Scala 2.12).

private boolean retry;

private enum FinalState { ABORTED, FAILED, SUCCEEDED };
private AtomicReference<FinalState> finalState = new AtomicReference<>(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final missing.

if (!this.finalState.compareAndSet(null, finalState)) {
if (this.finalState.get() == FinalState.ABORTED) {
log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition);
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it's expected that we may call done (successful or failed) after aborting and we should just ignore the response, in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not much else we can do since we've already signaled the future and invoked callbacks. I think it would be justifiable to elevate the log level to info if that helps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just elevate the log level if there's user visible impact. Supposedly, this happens with transactions and the produced data won't be committed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. Another case might be on producer shutdown, but that might be much harder to hit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log level is a tricky one. In the producer, messages for errors are all at debug level (for instance when we transition to error state in the transaction manager). So having this higher than debug may not add much value.

@@ -366,7 +384,7 @@ public void close() {
}
}

public void abort() {
public void abortRecordAppends() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth adding a comment explaining how abort and abortRecordAppends differ in one of the methods and add a reference in the other one. It seems like the reason we have two separate methods is that we do one of them with the lock held and the other with no lock held (as it invokes callbacks).

Copy link
Contributor

@apurvam apurvam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

if (!this.finalState.compareAndSet(null, finalState)) {
if (this.finalState.get() == FinalState.ABORTED) {
log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition);
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log level is a tricky one. In the producer, messages for errors are all at debug level (for instance when we transition to error state in the transaction manager). So having this higher than debug may not add much value.

@asfbot
Copy link

asfbot commented Jun 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5170/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jun 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5186/
Test PASSed (JDK 7 and Scala 2.11).

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, a few comments/questions.

private boolean retry;

private enum FinalState { ABORTED, FAILED, SUCCEEDED };
private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should this be with other final fields?

* @param exception The exception to use to complete the future and awaiting callbacks.
*/
public void abort(RuntimeException exception) {
abortRecordAppends();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe? It seems like we should document abort and done as thread-safe and hence we should not invoke abortRecordAppends, which is not thread-safe. Or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. The code seemed weird without it, but it's not actually needed.


KafkaException exception = new KafkaException();
batch.abort(exception);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check future.isDone here too?

@@ -55,6 +59,67 @@ public void testChecksumNullForMagicV2() {
}

@Test
public void testBatchAbort() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need one like this but where we call done with an exception.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, LGTM.

@asfbot
Copy link

asfbot commented Jun 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5189/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jun 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5205/
Test PASSed (JDK 7 and Scala 2.11).

@hachikuji
Copy link
Contributor Author

Merging to trunk and 0.11.0.

asfgit pushed a commit that referenced this pull request Jun 12, 2017
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3300 from hachikuji/KAFKA-5429

(cherry picked from commit 6c92fc5)
Signed-off-by: Jason Gustafson <jason@confluent.io>
@asfgit asfgit closed this in 6c92fc5 Jun 12, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants