Skip to content

Commit

Permalink
KAFKA-16007 Merge batch records during ZK migration (#15007)
Browse files Browse the repository at this point in the history
To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking.

Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size.

Reviewers: José Armando García Sancio <jsancio@apache.org>
  • Loading branch information
mumrah committed Dec 16, 2023
1 parent c215995 commit 0293906
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 44 deletions.
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.metadata.migration;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
* A record batch consumer that merges incoming batches into batches of a minimum a given size. It does so
* by buffering the records into an array that is later flushed to a downstream consumer. Batches consumed
* by this class will not be broken apart, only combined with other batches to reach the minimum batch size.
* </p>
* Note that {@link #flush()} must be called after the last batch has been accepted in order to flush any
* buffered records.
*/
public class BufferingBatchConsumer<T> implements Consumer<List<T>> {

private final Consumer<List<T>> delegateConsumer;
private final int minBatchSize;
private List<T> bufferedBatch;

BufferingBatchConsumer(Consumer<List<T>> delegateConsumer, int minBatchSize) {
this.delegateConsumer = delegateConsumer;
this.minBatchSize = minBatchSize;
this.bufferedBatch = new ArrayList<>(minBatchSize);
}

@Override
public void accept(List<T> batch) {
bufferedBatch.addAll(batch);
if (bufferedBatch.size() >= minBatchSize) {
flush();
}
}

public void flush() {
if (!bufferedBatch.isEmpty()) {
delegateConsumer.accept(bufferedBatch);
bufferedBatch = new ArrayList<>(minBatchSize);
}
}
}
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.util.Deadline;
import org.apache.kafka.server.util.FutureUtils;
Expand Down Expand Up @@ -87,7 +88,9 @@ public long nextPollTimeMs() {
* amount of time. A large value is selected to avoid timeouts in the common case, but prevent us from
* blocking indefinitely.
*/
private final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;
final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;

final static int MIGRATION_MIN_BATCH_SIZE = 1_000;

private final Time time;
private final Logger log;
Expand Down Expand Up @@ -645,6 +648,29 @@ public void run() throws Exception {
}
}

private BufferingBatchConsumer<ApiMessageAndVersion> buildMigrationBatchConsumer(
MigrationManifest.Builder manifestBuilder
) {
return new BufferingBatchConsumer<>(batch -> {
try {
if (log.isTraceEnabled()) {
batch.forEach(apiMessageAndVersion ->
log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
}
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
long batchStart = time.nanoseconds();
FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
"the metadata layer to commit " + batch.size() + " migration records",
future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
long batchEnd = time.nanoseconds();
manifestBuilder.acceptBatch(batch, batchEnd - batchStart);
} catch (Throwable e) {
// This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata
throw new RuntimeException(e);
}
}, MIGRATION_MIN_BATCH_SIZE);
}

class MigrateMetadataEvent extends MigrationEvent {
@Override
public void run() throws Exception {
Expand All @@ -664,23 +690,12 @@ public void run() throws Exception {
super.handleException(t);
}
try {
zkMigrationClient.readAllMetadata(batch -> {
try {
log.info("Migrating {} records from ZK", batch.size());
if (log.isTraceEnabled()) {
batch.forEach(apiMessageAndVersion ->
log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
}
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
"the metadata layer to commit migration record batch",
future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
manifestBuilder.acceptBatch(batch);
} catch (Throwable e) {
// This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata
throw new RuntimeException(e);
}
}, brokersInMetadata::add);
BufferingBatchConsumer<ApiMessageAndVersion> migrationBatchConsumer = buildMigrationBatchConsumer(manifestBuilder);
zkMigrationClient.readAllMetadata(
migrationBatchConsumer,
brokersInMetadata::add
);
migrationBatchConsumer.flush();
CompletableFuture<OffsetAndEpoch> completeMigrationFuture = zkRecordConsumer.completeMigration();
OffsetAndEpoch offsetAndEpochAfterMigration = FutureUtils.waitWithLogging(
KRaftMigrationDriver.this.log, "",
Expand Down
Expand Up @@ -41,15 +41,17 @@ public static class Builder {
private final Map<MetadataRecordType, Integer> counts = new HashMap<>();
private int batches = 0;
private int total = 0;
private long batchDurationsNs = 0;
private long endTimeNanos = 0;

Builder(Time time) {
this.time = time;
this.startTimeNanos = time.nanoseconds();
}

public void acceptBatch(List<ApiMessageAndVersion> recordBatch) {
public void acceptBatch(List<ApiMessageAndVersion> recordBatch, long durationNs) {
batches++;
batchDurationsNs += durationNs;
recordBatch.forEach(apiMessageAndVersion -> {
MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey());
counts.merge(type, 1, Integer::sum);
Expand All @@ -62,23 +64,26 @@ public MigrationManifest build() {
endTimeNanos = time.nanoseconds();
}
Map<MetadataRecordType, Integer> orderedCounts = new TreeMap<>(counts);
return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, orderedCounts);
return new MigrationManifest(total, batches, batchDurationsNs, endTimeNanos - startTimeNanos, orderedCounts);
}
}

private final int totalRecords;
private final int totalBatches;
private final long totalBatchDurationsNs;
private final long durationNanos;
private final Map<MetadataRecordType, Integer> recordTypeCounts;

MigrationManifest(
int totalRecords,
int totalBatches,
long totalBatchDurationsNs,
long durationNanos,
Map<MetadataRecordType, Integer> recordTypeCounts
) {
this.totalRecords = totalRecords;
this.totalBatches = totalBatches;
this.totalBatchDurationsNs = totalBatchDurationsNs;
this.durationNanos = durationNanos;
this.recordTypeCounts = Collections.unmodifiableMap(recordTypeCounts);
}
Expand All @@ -91,24 +96,34 @@ public long durationMs() {
return TimeUnit.NANOSECONDS.toMillis(durationNanos);
}

public double avgBatchDurationMs() {
if (totalBatches == 0) {
return -1;
}
return 1.0 * TimeUnit.NANOSECONDS.toMillis(totalBatchDurationsNs) / totalBatches;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MigrationManifest that = (MigrationManifest) o;
return totalRecords == that.totalRecords &&
totalBatches == that.totalBatches &&
totalBatchDurationsNs == that.totalBatchDurationsNs &&
durationNanos == that.durationNanos &&
recordTypeCounts.equals(that.recordTypeCounts);
}

@Override
public int hashCode() {
return Objects.hash(totalRecords, totalBatches, durationNanos, recordTypeCounts);
return Objects.hash(totalRecords, totalBatches, totalBatchDurationsNs, durationNanos, recordTypeCounts);
}

public String toString() {
return String.format("%d records were generated in %d ms across %d batches. The record types were %s",
totalRecords, durationMs(), totalBatches, recordTypeCounts);
return String.format(
"%d records were generated in %d ms across %d batches. The average time spent waiting on a " +
"batch was %.2f ms. The record types were %s",
totalRecords, durationMs(), totalBatches, avgBatchDurationMs(), recordTypeCounts);
}
}
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.metadata.migration;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class BufferingBatchConsumerTest {

@Test
public void testEmptyBatches() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4);
consumer.accept(Collections.emptyList());
consumer.accept(Collections.emptyList());
assertEquals(batches.size(), 0);
consumer.flush();
assertEquals(batches.size(), 0);
}

@Test
public void testOneBatchSameAsMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4);
consumer.accept(Arrays.asList(1, 2, 3, 4));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4));
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4));
}

@Test
public void testOneBatchSmallerThanMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4);
consumer.accept(Arrays.asList(1, 2, 3));
assertEquals(batches.size(), 0);
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3));
}

@Test
public void testOneBatchLargerThanMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4);
consumer.accept(Arrays.asList(1, 2, 3, 4, 5));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
}

@Test
public void testMultiBatchSameAsMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 6);
consumer.accept(Arrays.asList(1, 2));
consumer.accept(Arrays.asList(3, 4));
consumer.accept(Arrays.asList(5, 6));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
}

@Test
public void testMultiBatchSmallerThanMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 6);
consumer.accept(Arrays.asList(1, 2));
consumer.accept(Arrays.asList(3, 4));
consumer.accept(Collections.singletonList(5));
assertEquals(batches.size(), 0);
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
}

@Test
public void testMultiBatchLargerThanMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 6);
consumer.accept(Arrays.asList(1, 2));
consumer.accept(Arrays.asList(3, 4));
consumer.accept(Arrays.asList(5, 6));
consumer.accept(Arrays.asList(7, 8));
consumer.accept(Arrays.asList(9, 10));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
consumer.flush();
assertEquals(batches.size(), 2);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
assertEquals(batches.get(1), Arrays.asList(7, 8, 9, 10));
}
}

0 comments on commit 0293906

Please sign in to comment.