Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5308: TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker response #3152

Closed
wants to merge 8 commits into from

Conversation

dguy
Copy link
Contributor

@dguy dguy commented May 26, 2017

Return UNSUPPORTED_MESSAGE_FORMAT in handleWriteTxnMarkers when a topic is not the correct message format.
Remove any TopicPartitions that have same error from those waiting for markers

dguy added 2 commits May 26, 2017 10:49
…ic is not

the correct message format.
Remove such topicpartitions from those waiting for markers
@dguy
Copy link
Contributor Author

dguy commented May 26, 2017

@hachikuji @guozhangwang

@asfbot
Copy link

asfbot commented May 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4424/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4438/
Test PASSed (JDK 7 and Scala 2.11).

@guozhangwang
Copy link
Contributor

This test failure seems new? kafka.server.LogRecoveryTest.testHWCheckpointWithFailuresSingleLogSegment

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some minor comments, otherwise LGTM!

case Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT =>
// The producer would have failed to send data to the failed topic so we can safely remove the partition
// from the set waiting for markers
warn(s"Sending $transactionalId's transaction marker from partition $topicPartition has failed with " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest we log it as info instead of warn, since for the case when the topic gets deleted TC just log it as info while giving up sending markers. I think the case of topic get deleted and topic is on lower version are of similar alert-importance.

@@ -1528,6 +1537,11 @@ class KafkaApis(val requestChannel: RequestChannel,
partition -> MemoryRecords.withEndTransactionMarker(producerId, marker.producerEpoch, endTxnMarker)
}.toMap

if (partitionsWithIncorrectMessageFormat.nonEmpty)
errors.put(producerId, partitionsWithIncorrectMessageFormat.map{partition =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add space before / after {.

@@ -187,6 +187,12 @@ class TransactionMarkerRequestCompletionHandlerTest {
verifyRetriesPartitionOnError(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a unit test for broker-side handling of writeTxnMarker with replica manager mocked to return smaller magic number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang there are currently 0 unit-tests for this, just like most of KafkaApis. I've been trying to write a unit test for it, but it isn't as simple as just mocking the ReplicaManager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang i managed to add some tests for this

@@ -155,6 +155,14 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
abortSending = true

case Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT =>
Copy link

@hachikuji hachikuji May 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we handle UNSUPPORTED_VERSION here as well? Considering your other patch which checks the inter broker protocol version before handling the transactional request, it is possible (if unlikely) that we could hit that error in the middle of a rolling upgrade.

@asfbot
Copy link

asfbot commented May 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4539/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4524/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4526/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4541/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4536/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4538/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4553/
Test PASSed (JDK 7 and Scala 2.11).

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor comment.

import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;

public class ByteBufferChannel implements GatheringByteChannel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be in common.requests? Could we put it under common.network?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll go ahead and change this when I merge. Assuming @dguy wouldn't mind the different location.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Actually let me make the change-and-merge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I just realized that this is only extracted as a test util class, and hence it is okay to just leave it as is.

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch. LGTM

asfgit pushed a commit that referenced this pull request May 31, 2017
…xnMarker response

Return UNSUPPORTED_MESSAGE_FORMAT in handleWriteTxnMarkers when a topic is not the correct message format.
Remove any TopicPartitions that have same error from those waiting for markers

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3152 from dguy/kafka-5308

(cherry picked from commit 2cc8f48)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged to trunk and 0.11.0.

@asfgit asfgit closed this in 2cc8f48 May 31, 2017
@dguy dguy deleted the kafka-5308 branch August 16, 2017 13:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants