Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16007 Merge batch records during ZK migration #15007

Merged
merged 10 commits into from Dec 16, 2023

Conversation

mumrah
Copy link
Contributor

@mumrah mumrah commented Dec 13, 2023

To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking.

Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size.

@mumrah
Copy link
Contributor Author

mumrah commented Dec 14, 2023

Some supporting evidence.

I modified ZkMigrationIntegrationTest to create 1000 partitions in three ways (10 topics, 100 topics, 1000 topics). The number of batches generated and the overall migration time was pretty linear:

Case Records Batches Time Per Batch Total Time
10 topics x 100 partitions (trunk) 1020 12 32.33 536
100 topics x 10 partitions (trunk) 1110 102 30.30 3317
1000 topics x 1 partition (trunk) 2010 1002 30.69 32243
1000 topics x 1 partition (this PR) 2010 3 30.69 415

times in milliseconds.

As we can see, the batch size doesn't really affect the time waited on each batch. The biggest gain here comes from making fewer batches overall. In KRaftMigrationDriver, we wait for each batch to be committed by the controller which really limits throughput. Probably this ~30ms we're seeing is the round-trip time in the Raft layer.

@mumrah mumrah changed the title KAFKA-16007 Re-batch records during the migration KAFKA-16007 Merge batch records during the migration Dec 14, 2023
@mumrah mumrah changed the title KAFKA-16007 Merge batch records during the migration KAFKA-16007 Merge batch records during ZK migration Dec 14, 2023
batch.forEach(apiMessageAndVersion ->
log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
}
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the consumer here have any expectation on atomicity of the records? I am trying to figure out how the batching applies at the raft layer. Would you expect the batches to be preserved in the log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this consumer only combines batches, any semantics relying on batch boundaries should be ok. Anyways, batches are irrelevant during the migration since we're using transactions at the controller layer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To answer your question more directly

Does the consumer here have any expectation on atomicity of the records?

No. The eventual consumer of these batches is QuorumController#MigrationRecordConsumer which simply sends them along to Raft as a non-atomic batch. It doesn't care about batch boundaries or alignment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related question, what happens if KRaft loses leadership in the middle of this consumer loop?

return String.format("%d records were generated in %d ms across %d batches. The record types were %s",
totalRecords, durationMs(), totalBatches, recordTypeCounts);
return String.format(
"%d records were generated in %d ms across %d batches. The average batch size was %.2f " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "average batch size" might be a little ambiguous. Maybe we could say "record/batch" or something like that? Wondering if size in bytes is interesting also, but perhaps we can get that from the raft metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, after this patch we probably expect this value to be around 1000, so maybe it's not that useful to print out here.

Size is interesting, but yea we can infer that from the Raft metrics.

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the improvement.

}
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
long batchStart = time.nanoseconds();
FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, Kafka should avoid blocking on a CompletableFuture. This can be avoided by using CompletableFuture::thenCompose or better yet concurrent.Flow since the CompletableFuture doesn't return an interesting value.

I looked at ZkMigrationClient. If you wanted to use Flow. You would replace the use of Consumer with Flow.Subscriber. ZkMigrationClient would be come a Flow.Publisher.

Flow has support for pipelining and back-pressure. For example, you would make initial Subscription.request 1000 and request more data as the zkRecordConsumer processes more batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the Flow API looks really cool. I'll check that out. Does look like it's Java 9+ only, but I'll keep it in mind for future stuff (I think we'll be bumping up to Java 11 for 4.0)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. I keep forgetting that we still need to support Java 8. Looking forward to 4.x.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue but Time.waitForFuture doesn't look correct. It is comparing nano times. In the JVM you can't compare nano times because they can overflow. It is recommended to instead compare elapse time: https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/System.html#nanoTime()

This will cause this code to block forever when it overflows.

migrationBatchConsumer,
brokersInMetadata::add
);
migrationBatchConsumer.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If zkMigrationClient.readAllMetadata throws migrationBatchConsumer.close is not called. Is this okay because zkRecordConsumer.abortMigration is called in the catch?

batch.forEach(apiMessageAndVersion ->
log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
}
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related question, what happens if KRaft loses leadership in the middle of this consumer loop?

@mumrah
Copy link
Contributor Author

mumrah commented Dec 15, 2023

Thanks for taking a look @jsancio! I'll answer some related questions here. If an error occurs inside the readAllMetadata call in KRaftMetadataDriver, the catch block will explicitly abort the transaction. If the controller crashes during this call or any time before committing the EndTransactionRecord, then the next active controller will abort the partial transaction. This is the same case as a controller not crashing but losing leadership during the migration (like from a timeout).

The logic for detecting and aborting partial transactions is in ActivationRecordsGenerator.

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Should we fix the blocking issue in this PR?

Comment on lines 48 to 49
delegateConsumer.accept(new ArrayList<>(bufferedBatch));
bufferedBatch.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How bout this implication?

            delegateConsumer.accept(bufferedBatch);
            bufferedBatch = new ArrayList<>(minBatchSize);

Similar in flush. There seems to be some code duplication between these two methods.

}
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
long batchStart = time.nanoseconds();
FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. I keep forgetting that we still need to support Java 8. Looking forward to 4.x.

}
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
long batchStart = time.nanoseconds();
FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue but Time.waitForFuture doesn't look correct. It is comparing nano times. In the JVM you can't compare nano times because they can overflow. It is recommended to instead compare elapse time: https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/System.html#nanoTime()

This will cause this code to block forever when it overflows.

batches++;
batchDurationsNs += durationNs;
Copy link
Member

@jsancio jsancio Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. This is measuring how much time the ZK migration spent in the controller, writing and committing the batches to KRaft, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's correct.

@mumrah
Copy link
Contributor Author

mumrah commented Dec 15, 2023

I filed https://issues.apache.org/jira/browse/KAFKA-16020 for the nanos issue. Good catch!

@mumrah mumrah requested a review from jsancio December 15, 2023 22:10
Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, if the tests are green.

@mumrah
Copy link
Contributor Author

mumrah commented Dec 16, 2023

Test failures look unrelated

@mumrah mumrah merged commit 7f763d3 into apache:trunk Dec 16, 2023
1 check was pending
mumrah added a commit that referenced this pull request Dec 16, 2023
To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking.

Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size.

Reviewers: José Armando García Sancio <jsancio@apache.org>
mumrah added a commit that referenced this pull request Dec 19, 2023
To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking.

Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size.

Reviewers: José Armando García Sancio <jsancio@apache.org>
gaurav-narula pushed a commit to gaurav-narula/kafka that referenced this pull request Jan 24, 2024
To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking.

Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size.

Reviewers: José Armando García Sancio <jsancio@apache.org>
anurag-harness pushed a commit to anurag-harness/kafka that referenced this pull request Feb 9, 2024
To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking.

Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size.

Reviewers: José Armando García Sancio <jsancio@apache.org>
anurag-harness added a commit to anurag-harness/kafka that referenced this pull request Feb 9, 2024
)

To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking.

Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size.

Reviewers: José Armando García Sancio <jsancio@apache.org>

Co-authored-by: David Arthur <mumrah@gmail.com>
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking.

Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size.

Reviewers: José Armando García Sancio <jsancio@apache.org>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking.

Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size.

Reviewers: José Armando García Sancio <jsancio@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants