diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 52d3f27f3cf7c..965f8074f8014 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -21,10 +21,10 @@ import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; import org.apache.kafka.common.errors.NotCoordinatorException; -import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.AbstractRecords; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.record.EndTransactionMarker; import org.apache.kafka.common.record.MemoryRecords; @@ -829,11 +829,11 @@ private void flushCurrentBatch() { } /** - * Flushes the current batch if it is transactional or if it has passed the append linger time. + * Flushes the current batch if it is transactional, if it has passed the append linger time, or if it is full. */ private void maybeFlushCurrentBatch(long currentTimeMs) { if (currentBatch != null) { - if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) { + if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) { flushCurrentBatch(); } } @@ -911,6 +911,24 @@ public void run() { } } + /** + * Completes the given event once all pending writes are completed. + * + * @param event The event to complete once all pending + * writes are completed. + */ + private void waitForPendingWrites(DeferredEvent event) { + if (currentBatch != null && currentBatch.builder.numRecords() > 0) { + currentBatch.deferredEvents.add(event); + } else { + if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) { + deferredEventQueue.add(coordinator.lastWrittenOffset(), DeferredEventCollection.of(log, event)); + } else { + event.complete(null); + } + } + } + /** * Appends records to the log and replay them to the state machine. * @@ -940,17 +958,8 @@ private void append( if (records.isEmpty()) { // If the records are empty, it was a read operation after all. In this case, - // the response can be returned directly iff there are no pending write operations; - // otherwise, the read needs to wait on the last write operation to be completed. - if (currentBatch != null && currentBatch.builder.numRecords() > 0) { - currentBatch.deferredEvents.add(event); - } else { - if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) { - deferredEventQueue.add(coordinator.lastWrittenOffset(), DeferredEventCollection.of(log, event)); - } else { - event.complete(null); - } - } + // the response can be returned once any pending write operations complete. + waitForPendingWrites(event); } else { // If the records are not empty, first, they are applied to the state machine, // second, they are appended to the opened batch. @@ -984,27 +993,18 @@ private void append( } if (isAtomic) { - // Compute the estimated size of the records. - int estimatedSize = AbstractRecords.estimateSizeInBytes( + // Compute the size of the records. + int estimatedSizeUpperBound = AbstractRecords.estimateSizeInBytes( currentBatch.builder.magic(), - compression.type(), + CompressionType.NONE, recordsToAppend ); - // Check if the current batch has enough space. We check this before - // replaying the records in order to avoid having to revert back - // changes if the records do not fit within a batch. - if (estimatedSize > currentBatch.builder.maxAllowedBytes()) { - throw new RecordTooLargeException("Message batch size is " + estimatedSize + - " bytes in append to partition " + tp + " which exceeds the maximum " + - "configured size of " + currentBatch.maxBatchSize + "."); - } - - if (!currentBatch.builder.hasRoomFor(estimatedSize)) { - // Otherwise, we write the current batch, allocate a new one and re-verify - // whether the records fit in it. - // If flushing fails, we don't catch the exception in order to let - // the caller fail the current operation. + if (!currentBatch.builder.hasRoomFor(estimatedSizeUpperBound)) { + // Start a new batch when the total uncompressed data size would exceed + // the max batch size. We still allow atomic writes with an uncompressed size + // larger than the max batch size as long as they compress down to under the max + // batch size. These large writes go into a batch by themselves. flushCurrentBatch(); maybeAllocateNewBatch( producerId, @@ -1075,8 +1075,8 @@ private void append( // Add the event to the list of pending events associated with the batch. currentBatch.deferredEvents.add(event); - // Write the current batch if it is transactional or if the linger timeout - // has expired. + // Write the current batch if it is transactional, if the linger timeout + // has expired, or if it is full. // If flushing fails, we don't catch the exception in order to let // the caller fail the current operation. maybeFlushCurrentBatch(currentTimeMs); diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index 4a040df6712c0..dfbbdf048bc20 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.errors.NotEnoughReplicasException; @@ -51,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.OptionalInt; +import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -4292,6 +4294,268 @@ public void testRecordFlushTime() throws Exception { assertEquals("response3", write3.get(5, TimeUnit.SECONDS)); } + @Test + public void testCompressibleRecordTriggersFlushAndSucceeds() throws Exception { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + Compression compression = Compression.gzip().build(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withCompression(compression) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertNull(ctx.currentBatch); + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create 2 records with a quarter of the max batch size each. + List records = Stream.of('1', '2').map(c -> { + char[] payload = new char[maxBatchSize / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write #1 with the small records, batch will be about half full + long firstBatchTimestamp = timer.time().milliseconds(); + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(records, "response1") + ); + + // A batch has been created. + assertNotNull(ctx.currentBatch); + + // Verify the state - batch is not yet flushed + assertEquals(List.of(), writer.entries(TP)); + + // Create a record of highly compressible data + List largeRecord = List.of("a".repeat((int) (0.75 * maxBatchSize))); + + // Write #2 with the large record. This record is too large to go into the previous batch + // uncompressed but fits in a new buffer, so we should flush the previous batch and allocate + // a new one. + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(largeRecord, "response2") + ); + + // Verify the state. The first batch has flushed but the second is pending. + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, largeRecord.get(0)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + records(firstBatchTimestamp, compression, records) + ), writer.entries(TP)); + + // Advance past the linger time + timer.advanceClock(11); + + // Commit and verify that the second batch is completed + writer.commit(TP); + assertTrue(write1.isDone()); + assertTrue(write2.isDone()); + assertEquals(3L, ctx.coordinator.lastCommittedOffset()); + assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); + assertEquals("response2", write2.get(5, TimeUnit.SECONDS)); + } + + @Test + public void testLargeCompressibleRecordTriggersFlushAndSucceeds() throws Exception { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + Compression compression = Compression.gzip().build(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withCompression(compression) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertNull(ctx.currentBatch); + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create 2 records with a quarter of the max batch size each. + List records = Stream.of('1', '2').map(c -> { + char[] payload = new char[maxBatchSize / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write #1 with the small records, batch will be about half full + long firstBatchTimestamp = timer.time().milliseconds(); + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(records, "response1") + ); + + // A batch has been created. + assertNotNull(ctx.currentBatch); + + // Verify the state - batch is not yet flushed + assertEquals(List.of(), writer.entries(TP)); + + // Create a large record of highly compressible data + List largeRecord = List.of("a".repeat(3 * maxBatchSize)); + + // Write #2 with the large record. This record is too large to go into the previous batch + // uncompressed but will fit in the new buffer once compressed, so we should flush the + // previous batch and successfully allocate a new batch for this record. The new batch + // will also trigger an immediate flush. + long secondBatchTimestamp = timer.time().milliseconds(); + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(largeRecord, "response2") + ); + + // Verify the state. + assertEquals(3L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, largeRecord.get(0)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + records(firstBatchTimestamp, compression, records), + records(secondBatchTimestamp, compression, largeRecord) + ), writer.entries(TP)); + + // Commit and verify that writes are completed. + writer.commit(TP); + assertTrue(write1.isDone()); + assertTrue(write2.isDone()); + assertEquals(3L, ctx.coordinator.lastCommittedOffset()); + assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); + assertEquals("response2", write2.get(5, TimeUnit.SECONDS)); + } + + @Test + public void testLargeUncompressibleRecordTriggersFlushAndFails() throws Exception { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + Compression compression = Compression.gzip().build(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withCompression(compression) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertNull(ctx.currentBatch); + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create 2 records with a quarter of the max batch size each. + List records = Stream.of('1', '2').map(c -> { + char[] payload = new char[maxBatchSize / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write #1 with the small records, batch will be about half full + long firstBatchTimestamp = timer.time().milliseconds(); + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(records, "response1") + ); + + // A batch has been created. + assertNotNull(ctx.currentBatch); + + // Verify the state - batch is not yet flushed + assertEquals(List.of(), writer.entries(TP)); + + // Create a large record of not very compressible data + char[] payload = new char[3 * maxBatchSize]; + Random offset = new Random(); + for (int i = 0; i < payload.length; i++) { + payload[i] = (char) ('a' + ((char) offset.nextInt() % 26)); + } + List largeRecord = List.of(new String(payload)); + + // Write #2 with the large record. This record is too large to go into the previous batch + // and is not compressible so it should be flushed. It is also too large to fit in a new batch + // so the write should fail with RecordTooLargeException + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(largeRecord, "response2") + ); + + // Check that write2 fails with RecordTooLargeException + assertFutureThrows(RecordTooLargeException.class, write2); + + // Verify the state. The first batch was flushed and the largeRecord + // write failed. + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + records(firstBatchTimestamp, compression, records) + ), writer.entries(TP)); + + // Commit and verify that writes are completed. + writer.commit(TP); + assertTrue(write1.isDone()); + assertTrue(write2.isDone()); + assertEquals(2L, ctx.coordinator.lastCommittedOffset()); + assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); + } + @Test public void testRecordEventPurgatoryTime() throws Exception { Duration writeTimeout = Duration.ofMillis(1000); diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java index c3eda174671f6..8e39f9db8f88e 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java @@ -44,14 +44,31 @@ public class TestUtil { public static MemoryRecords records( long timestamp, + Compression compression, String... records ) { - return records(timestamp, Arrays.stream(records).toList()); + return records(timestamp, compression, Arrays.stream(records).toList()); + } + + + public static MemoryRecords records( + long timestamp, + String... records + ) { + return records(timestamp, Compression.NONE, Arrays.stream(records).toList()); } public static MemoryRecords records( long timestamp, List records + ) { + return records(timestamp, Compression.NONE, records); + } + + public static MemoryRecords records( + long timestamp, + Compression compression, + List records ) { if (records.isEmpty()) return MemoryRecords.EMPTY; @@ -62,7 +79,7 @@ public static MemoryRecords records( int sizeEstimate = AbstractRecords.estimateSizeInBytes( RecordVersion.current().value, - CompressionType.NONE, + compression.type(), simpleRecords ); @@ -71,7 +88,7 @@ public static MemoryRecords records( MemoryRecordsBuilder builder = MemoryRecords.builder( buffer, RecordVersion.current().value, - Compression.NONE, + compression, TimestampType.CREATE_TIME, 0L, timestamp,