Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16007 Merge batch records during ZK migration #15007

Merged
merged 10 commits into from Dec 16, 2023
Merged
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.metadata.migration;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
* A record batch consumer that merges incoming batches into batches of a minimum a given size. It does so
* by buffering the records into an array that is later flushed to a downstream consumer. Batches consumed
* by this class will not be broken apart, only combined with other batches to reach the minimum batch size.
* </p>
* Note that {@link #flush()} must be called after the last batch has been accepted in order to flush any
* buffered records.
*/
public class BufferingBatchConsumer<T> implements Consumer<List<T>> {

private final Consumer<List<T>> delegateConsumer;
private final int minBatchSize;
private List<T> bufferedBatch;

BufferingBatchConsumer(Consumer<List<T>> delegateConsumer, int minBatchSize) {
this.delegateConsumer = delegateConsumer;
this.minBatchSize = minBatchSize;
this.bufferedBatch = new ArrayList<>(minBatchSize);
}

@Override
public void accept(List<T> batch) {
bufferedBatch.addAll(batch);
if (bufferedBatch.size() >= minBatchSize) {
flush();
}
}

public void flush() {
if (!bufferedBatch.isEmpty()) {
delegateConsumer.accept(bufferedBatch);
bufferedBatch = new ArrayList<>(minBatchSize);
}
}
}
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.util.Deadline;
import org.apache.kafka.server.util.FutureUtils;
Expand Down Expand Up @@ -87,7 +88,9 @@ public long nextPollTimeMs() {
* amount of time. A large value is selected to avoid timeouts in the common case, but prevent us from
* blocking indefinitely.
*/
private final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;
final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;

final static int MIGRATION_MIN_BATCH_SIZE = 1_000;

private final Time time;
private final Logger log;
Expand Down Expand Up @@ -645,6 +648,29 @@ public void run() throws Exception {
}
}

private BufferingBatchConsumer<ApiMessageAndVersion> buildMigrationBatchConsumer(
MigrationManifest.Builder manifestBuilder
) {
return new BufferingBatchConsumer<>(batch -> {
try {
if (log.isTraceEnabled()) {
batch.forEach(apiMessageAndVersion ->
log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
}
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the consumer here have any expectation on atomicity of the records? I am trying to figure out how the batching applies at the raft layer. Would you expect the batches to be preserved in the log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this consumer only combines batches, any semantics relying on batch boundaries should be ok. Anyways, batches are irrelevant during the migration since we're using transactions at the controller layer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To answer your question more directly

Does the consumer here have any expectation on atomicity of the records?

No. The eventual consumer of these batches is QuorumController#MigrationRecordConsumer which simply sends them along to Raft as a non-atomic batch. It doesn't care about batch boundaries or alignment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related question, what happens if KRaft loses leadership in the middle of this consumer loop?

long batchStart = time.nanoseconds();
FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, Kafka should avoid blocking on a CompletableFuture. This can be avoided by using CompletableFuture::thenCompose or better yet concurrent.Flow since the CompletableFuture doesn't return an interesting value.

I looked at ZkMigrationClient. If you wanted to use Flow. You would replace the use of Consumer with Flow.Subscriber. ZkMigrationClient would be come a Flow.Publisher.

Flow has support for pipelining and back-pressure. For example, you would make initial Subscription.request 1000 and request more data as the zkRecordConsumer processes more batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the Flow API looks really cool. I'll check that out. Does look like it's Java 9+ only, but I'll keep it in mind for future stuff (I think we'll be bumping up to Java 11 for 4.0)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. I keep forgetting that we still need to support Java 8. Looking forward to 4.x.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue but Time.waitForFuture doesn't look correct. It is comparing nano times. In the JVM you can't compare nano times because they can overflow. It is recommended to instead compare elapse time: https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/System.html#nanoTime()

This will cause this code to block forever when it overflows.

"the metadata layer to commit " + batch.size() + " migration records",
future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
long batchEnd = time.nanoseconds();
manifestBuilder.acceptBatch(batch, batchEnd - batchStart);
} catch (Throwable e) {
// This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata
throw new RuntimeException(e);
}
}, MIGRATION_MIN_BATCH_SIZE);
}

class MigrateMetadataEvent extends MigrationEvent {
@Override
public void run() throws Exception {
Expand All @@ -664,23 +690,12 @@ public void run() throws Exception {
super.handleException(t);
}
try {
zkMigrationClient.readAllMetadata(batch -> {
try {
log.info("Migrating {} records from ZK", batch.size());
if (log.isTraceEnabled()) {
batch.forEach(apiMessageAndVersion ->
log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
}
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
"the metadata layer to commit migration record batch",
future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
manifestBuilder.acceptBatch(batch);
} catch (Throwable e) {
// This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata
throw new RuntimeException(e);
}
}, brokersInMetadata::add);
BufferingBatchConsumer<ApiMessageAndVersion> migrationBatchConsumer = buildMigrationBatchConsumer(manifestBuilder);
zkMigrationClient.readAllMetadata(
migrationBatchConsumer,
brokersInMetadata::add
);
migrationBatchConsumer.flush();
CompletableFuture<OffsetAndEpoch> completeMigrationFuture = zkRecordConsumer.completeMigration();
OffsetAndEpoch offsetAndEpochAfterMigration = FutureUtils.waitWithLogging(
KRaftMigrationDriver.this.log, "",
Expand Down
Expand Up @@ -41,15 +41,17 @@ public static class Builder {
private final Map<MetadataRecordType, Integer> counts = new HashMap<>();
private int batches = 0;
private int total = 0;
private long batchDurationsNs = 0;
private long endTimeNanos = 0;

Builder(Time time) {
this.time = time;
this.startTimeNanos = time.nanoseconds();
}

public void acceptBatch(List<ApiMessageAndVersion> recordBatch) {
public void acceptBatch(List<ApiMessageAndVersion> recordBatch, long durationNs) {
batches++;
batchDurationsNs += durationNs;
Copy link
Member

@jsancio jsancio Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. This is measuring how much time the ZK migration spent in the controller, writing and committing the batches to KRaft, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's correct.

recordBatch.forEach(apiMessageAndVersion -> {
MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey());
counts.merge(type, 1, Integer::sum);
Expand All @@ -62,23 +64,26 @@ public MigrationManifest build() {
endTimeNanos = time.nanoseconds();
}
Map<MetadataRecordType, Integer> orderedCounts = new TreeMap<>(counts);
return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, orderedCounts);
return new MigrationManifest(total, batches, batchDurationsNs, endTimeNanos - startTimeNanos, orderedCounts);
}
}

private final int totalRecords;
private final int totalBatches;
private final long totalBatchDurationsNs;
private final long durationNanos;
private final Map<MetadataRecordType, Integer> recordTypeCounts;

MigrationManifest(
int totalRecords,
int totalBatches,
long totalBatchDurationsNs,
long durationNanos,
Map<MetadataRecordType, Integer> recordTypeCounts
) {
this.totalRecords = totalRecords;
this.totalBatches = totalBatches;
this.totalBatchDurationsNs = totalBatchDurationsNs;
this.durationNanos = durationNanos;
this.recordTypeCounts = Collections.unmodifiableMap(recordTypeCounts);
}
Expand All @@ -91,24 +96,34 @@ public long durationMs() {
return TimeUnit.NANOSECONDS.toMillis(durationNanos);
}

public double avgBatchDurationMs() {
if (totalBatches == 0) {
return -1;
}
return 1.0 * TimeUnit.NANOSECONDS.toMillis(totalBatchDurationsNs) / totalBatches;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MigrationManifest that = (MigrationManifest) o;
return totalRecords == that.totalRecords &&
totalBatches == that.totalBatches &&
totalBatchDurationsNs == that.totalBatchDurationsNs &&
durationNanos == that.durationNanos &&
recordTypeCounts.equals(that.recordTypeCounts);
}

@Override
public int hashCode() {
return Objects.hash(totalRecords, totalBatches, durationNanos, recordTypeCounts);
return Objects.hash(totalRecords, totalBatches, totalBatchDurationsNs, durationNanos, recordTypeCounts);
}

public String toString() {
return String.format("%d records were generated in %d ms across %d batches. The record types were %s",
totalRecords, durationMs(), totalBatches, recordTypeCounts);
return String.format(
"%d records were generated in %d ms across %d batches. The average time spent waiting on a " +
"batch was %.2f ms. The record types were %s",
totalRecords, durationMs(), totalBatches, avgBatchDurationMs(), recordTypeCounts);
}
}
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.metadata.migration;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class BufferingBatchConsumerTest {

@Test
public void testEmptyBatches() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4);
consumer.accept(Collections.emptyList());
consumer.accept(Collections.emptyList());
assertEquals(batches.size(), 0);
consumer.flush();
assertEquals(batches.size(), 0);
}

@Test
public void testOneBatchSameAsMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4);
consumer.accept(Arrays.asList(1, 2, 3, 4));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4));
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4));
}

@Test
public void testOneBatchSmallerThanMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4);
consumer.accept(Arrays.asList(1, 2, 3));
assertEquals(batches.size(), 0);
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3));
}

@Test
public void testOneBatchLargerThanMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4);
consumer.accept(Arrays.asList(1, 2, 3, 4, 5));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
}

@Test
public void testMultiBatchSameAsMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 6);
consumer.accept(Arrays.asList(1, 2));
consumer.accept(Arrays.asList(3, 4));
consumer.accept(Arrays.asList(5, 6));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
}

@Test
public void testMultiBatchSmallerThanMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 6);
consumer.accept(Arrays.asList(1, 2));
consumer.accept(Arrays.asList(3, 4));
consumer.accept(Collections.singletonList(5));
assertEquals(batches.size(), 0);
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
}

@Test
public void testMultiBatchLargerThanMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 6);
consumer.accept(Arrays.asList(1, 2));
consumer.accept(Arrays.asList(3, 4));
consumer.accept(Arrays.asList(5, 6));
consumer.accept(Arrays.asList(7, 8));
consumer.accept(Arrays.asList(9, 10));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
consumer.flush();
assertEquals(batches.size(), 2);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
assertEquals(batches.get(1), Arrays.asList(7, 8, 9, 10));
}
}