Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5340: Batch splitting should preserve magic and transactional flag #3162

Closed
wants to merge 7 commits into from

Conversation

hachikuji
Copy link
Contributor

No description provided.

@hachikuji
Copy link
Contributor Author

@becketqin Maybe you can take a look at this?

@asfbot
Copy link

asfbot commented May 29, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4520/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 29, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4505/
Test PASSed (JDK 8 and Scala 2.12).

@@ -238,7 +238,7 @@ public RecordsInfo info() {
}
}

public void setProducerState(long producerId, short producerEpoch, int baseSequence) {
public void setProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, the reason I had to change this is that we need to close the record builder in order to split it. At that point, we don't have a producerId yet, so if isTransactional is set to true, then MemoryRecordsBuilder.close() will raise an exception. To get around that, this change ensures that we always have a producerId when we set isTransactional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth adding a comment about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good general improvement, to set all this data right at the very end.

Thunk thunk = thunkIter.next();
if (batch == null) {
batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
while (recordBatchIter.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this logic: we expect memoryRecords.batches() to only have one batch but here we are expecting it to have many?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the message format is v0 or v1, then we could have multiple batches (each record is a batch of size 1). Obviously the batch splitting is intended for batches with multiple records, but it felt a little awkward and unnecessary to restrict this function to only magic >= 2 or compression != NONE.

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4611/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4626/
Test PASSed (JDK 7 and Scala 2.11).

@hachikuji
Copy link
Contributor Author

@guozhangwang Comments addressed. Please take another look.

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4643/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4658/
Test FAILed (JDK 7 and Scala 2.11).

RecordBatch recordBatch = recordBatchIter.next();
if (recordBatch.magic() < MAGIC_VALUE_V2 && !recordBatch.isCompressed())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the check in the Sender.completeBatch() as well to note call split in this case? Otherwise if the producer was sending uncompressed messages and one of the message in a batch a too large, it seems the producer will not fire the callback with correct exception.

This would probably be a rare case because a big message will typically get sent in a dedicated batch if compression is none. But it is theoretically possible if user configured the producer batch size to be larger than the max.message.size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that makes sense. I will update the patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@becketqin Actually, in my original patch, I modified this code to handle all cases. Perhaps it would be a little simpler to revert to that behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think I'm just going to add the check in Sender.completeBatch() and follow the behavior prior to KIP-126 if it is the old message format without compression.

@@ -218,7 +226,7 @@ private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batc
record.key(), record.value(), record.headers()), batchSize);
ByteBuffer buffer = ByteBuffer.allocate(initialSize);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compressionType(),
TimestampType.CREATE_TIME, 0L, recordsBuilder.isTransactional());
TimestampType.CREATE_TIME, 0L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, it seems that the transaction is the same as the parent batch before the change. Wouldn't that preserve the transactional flag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I attempted to fix this previously, but the logic was incorrect. The builder requires that the producerId and the producer epoch are both set if isTransactional is set to true. I felt like this was a useful sanity check, so I changed the logic to pass isTransactional at the same time that we set the producer id and epoch. Does that make sense?

@@ -232,6 +232,15 @@ public RuntimeException lastError() {
return lastError;
}

public synchronized boolean ensurePartitionAdded(TopicPartition tp) {
if (isInTransaction() && !partitionsInTransaction.contains(tp)) {
transitionToFatalError(new IllegalStateException("Attempted to dequeue a record batch to send " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this ever happen assuming we do not have bugs? If yes then we should probably throw IllegalStateException directly to indicate a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not happen normally, but if there is a bug in the code, then the result is basically a corrupted topic, so I felt it is worth having the check. Transitioning to the fatal error state ensures that the user will see the error and that no further progress can be made. If we just threw the exception, the Sender would simply die apparently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I was following the ordinary rule for using RTE for any unexpected bugs, but this is definitely better in operations.

if (transactionManager != null)
isTransactional = transactionManager.isInTransaction();
return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L, isTransactional);
return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the builder() function with isTransactional at line 377 in MemoryRecords are not externally used any more; could we remove it then?

int maxRetries = 1;
String topic = "testSplitBatchAndSend";
String topic = tp.topic();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice cleanup!

@guozhangwang
Copy link
Contributor

LGTM.

Copy link
Contributor

@apurvam apurvam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a minor comment, but otherwise this looks good to me. Thanks!

RecordBatch recordBatch = recordBatchIter.next();
if (recordBatch.magic() < MAGIC_VALUE_V2 && !recordBatch.isCompressed())
throw new IllegalArgumentException("Batch splitting cannot be used with non-compressed messages " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wording could be improved: "Batch splitting cannot be used with non-compressed messages, NOR with message format versions v0 and v1"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rewording is not quite right. Batch splitting can be used for v0 and v1 if compression is enabled.

@@ -238,7 +238,7 @@ public RecordsInfo info() {
}
}

public void setProducerState(long producerId, short producerEpoch, int baseSequence) {
public void setProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good general improvement, to set all this data right at the very end.

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4664/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4679/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4685/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4669/
Test PASSed (JDK 8 and Scala 2.12).

@hachikuji
Copy link
Contributor Author

Pushed a fix to address @becketqin's comments. If there are no further comments, I will merge later this evening.

@becketqin
Copy link
Contributor

Thanks for the patch. LGTM.

@asfbot
Copy link

asfbot commented Jun 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4699/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4683/
Test PASSed (JDK 8 and Scala 2.12).

@asfgit asfgit closed this in e4a6b50 Jun 1, 2017
asfgit pushed a commit that referenced this pull request Jun 1, 2017
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva@confluent.io>, Jiangjie Qin <becket.qin@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #3162 from hachikuji/KAFKA-5340

(cherry picked from commit e4a6b50)
Signed-off-by: Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants