From c3b551569f327bae238b609ea9936755cb210218 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 19 Jun 2017 15:09:48 -0700 Subject: [PATCH 1/3] MINOR: MemoryRecordsBuilder.sizeInBytes should consider initial position --- .../producer/internals/ProducerBatch.java | 8 ++++++- .../producer/internals/RecordAccumulator.java | 2 +- .../clients/producer/internals/Sender.java | 4 ++-- .../common/record/MemoryRecordsBuilder.java | 12 +++++++--- .../internals/RecordAccumulatorTest.java | 2 +- .../record/MemoryRecordsBuilderTest.java | 24 +++++++++++++++++++ 6 files changed, 44 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index fcdda8d28825f..d7297990e39e7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -362,8 +362,14 @@ public MemoryRecords records() { return recordsBuilder.build(); } + public int estimatedSizeInBytes() { + return recordsBuilder.estimatedSizeInBytes(); + } + public int sizeInBytes() { - return recordsBuilder.sizeInBytes(); + if (!recordsBuilder.isClosed()) + throw new IllegalArgumentException("Cannot get an exact size of the batch until it has been closed"); + return records().sizeInBytes(); } public double compressionRatio() { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ea6624e7588ca..73b8997c4fa74 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -449,7 +449,7 @@ public Map> drain(Cluster cluster, boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; // Only drain the batch if it is not during backoff period. if (!backoff) { - if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) { + if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size due // to compression; in this case we will still eventually send this batch in a single // request diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 27eb2447d9b2c..062fde98d62c0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -817,7 +817,7 @@ public void updateProduceRequestMetrics(Map> batche // per-topic bytes send rate String topicByteRateName = "topic." + topic + ".bytes"; Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName)); - topicByteRate.record(batch.sizeInBytes()); + topicByteRate.record(batch.estimatedSizeInBytes()); // per-topic compression rate String topicCompressionRateName = "topic." + topic + ".compression-rate"; @@ -825,7 +825,7 @@ public void updateProduceRequestMetrics(Map> batche topicCompressionRate.record(batch.compressionRatio()); // global metrics - this.batchSizeSensor.record(batch.sizeInBytes(), now); + this.batchSizeSensor.record(batch.estimatedSizeInBytes(), now); this.queueTimeSensor.record(batch.queueTimeMs(), now); this.compressionRateSensor.record(batch.compressionRatio()); this.maxRecordSizeSensor.record(batch.maxRecordSize, now); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 9676c83dd08cd..ff7696c4788fa 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -55,7 +55,9 @@ public class MemoryRecordsBuilder { private final int partitionLeaderEpoch; private final int writeLimit; - private volatile float estimatedCompressionRatio; + // Use a conservative estimate of the compression ratio. The producer overrides this using statistics + // from previous batches before appending any records. + private volatile float estimatedCompressionRatio = 1.0F; private boolean appendStreamIsClosed = false; private boolean isTransactional; @@ -662,7 +664,7 @@ private void ensureOpenForRecordBatchWrite() { */ private int estimatedBytesWritten() { if (compressionType == CompressionType.NONE) { - return buffer().position(); + return buffer().position() - initialPosition; } else { // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes return (int) (writtenUncompressed * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR); @@ -723,7 +725,11 @@ public boolean isFull() { return appendStreamIsClosed || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten()); } - public int sizeInBytes() { + /** + * Get an estimate of the number of bytes written to the underlying buffer. The returned value + * is exactly correct if the record set is not compressed or if the builder has been closed. + */ + public int estimatedSizeInBytes() { return builtRecords != null ? builtRecords.sizeInBytes() : estimatedBytesWritten(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 5bfbdf0aa7f74..99e7557827148 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -777,7 +777,7 @@ private BatchDrainedResult completeOrSplitBatches(RecordAccumulator accum, int b for (ProducerBatch batch : batchList) { batchDrained = true; numBatches++; - if (batch.sizeInBytes() > batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD) { + if (batch.estimatedSizeInBytes() > batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD) { accum.splitAndReenqueue(batch); // release the resource of the original big batch. numSplit++; diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index a1af545cfa1f3..cc2bf797226f9 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -230,6 +230,30 @@ public void testCompressionRateV0() { } } + @Test + public void testEstimatedSizeInBytes() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.position(bufferOffset); + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, + TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + + int previousEstimate = 0; + for (int i = 0; i < 10; i++) { + builder.append(new SimpleRecord(i, ("" + i).getBytes())); + int currentEstimate = builder.estimatedSizeInBytes(); + assertTrue(currentEstimate > previousEstimate); + previousEstimate = currentEstimate; + } + + int bytesWrittenBeforeClose = builder.estimatedSizeInBytes(); + MemoryRecords records = builder.build(); + assertEquals(records.sizeInBytes(), builder.estimatedSizeInBytes()); + if (compressionType == CompressionType.NONE) + assertEquals(records.sizeInBytes(), bytesWrittenBeforeClose); + } + @Test public void testCompressionRateV1() { ByteBuffer buffer = ByteBuffer.allocate(1024); From b1ce031e2c43d308d922c4023335afac5d193b65 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 20 Jun 2017 09:59:36 -0700 Subject: [PATCH 2/3] A few cleanups --- .../producer/internals/ProducerBatch.java | 6 ------ .../producer/internals/RecordAccumulator.java | 2 +- .../common/record/MemoryRecordsBuilder.java | 17 ++++++++++------- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index d7297990e39e7..53563baa8cf10 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -366,12 +366,6 @@ public int estimatedSizeInBytes() { return recordsBuilder.estimatedSizeInBytes(); } - public int sizeInBytes() { - if (!recordsBuilder.isClosed()) - throw new IllegalArgumentException("Cannot get an exact size of the batch until it has been closed"); - return records().sizeInBytes(); - } - public double compressionRatio() { return recordsBuilder.compressionRatio(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 73b8997c4fa74..9b9aa028a8f57 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -483,7 +483,7 @@ public Map> drain(Cluster cluster, batch.setProducerState(producerIdAndEpoch, sequenceNumber, isTransactional); } batch.close(); - size += batch.sizeInBytes(); + size += batch.records().sizeInBytes(); ready.add(batch); batch.drained(now); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index ff7696c4788fa..5fe1e7d1d1fc3 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -57,14 +57,15 @@ public class MemoryRecordsBuilder { // Use a conservative estimate of the compression ratio. The producer overrides this using statistics // from previous batches before appending any records. - private volatile float estimatedCompressionRatio = 1.0F; + private float estimatedCompressionRatio = 1.0F; private boolean appendStreamIsClosed = false; private boolean isTransactional; private long producerId; private short producerEpoch; private int baseSequence; - private long writtenUncompressed = 0; + private int writtenUncompressed = 0; + private int batchHeaderSize; private int numRecords = 0; private float actualCompressionRatio = 1; private long maxTimestamp = RecordBatch.NO_TIMESTAMP; @@ -113,17 +114,19 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, this.isControlBatch = isControlBatch; this.partitionLeaderEpoch = partitionLeaderEpoch; this.writeLimit = writeLimit; - this.initialPosition = bufferStream.position(); if (magic > RecordBatch.MAGIC_VALUE_V1) { - bufferStream.position(initialPosition + DefaultRecordBatch.RECORDS_OFFSET); + batchHeaderSize = DefaultRecordBatch.RECORDS_OFFSET; } else if (compressionType != CompressionType.NONE) { // for compressed records, leave space for the header and the shallow message metadata // and move the starting position to the value payload offset - bufferStream.position(initialPosition + Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic)); + batchHeaderSize = Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic); + } else { + batchHeaderSize = 0; } + bufferStream.position(initialPosition + batchHeaderSize); this.bufferStream = bufferStream; this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic)); } @@ -664,10 +667,10 @@ private void ensureOpenForRecordBatchWrite() { */ private int estimatedBytesWritten() { if (compressionType == CompressionType.NONE) { - return buffer().position() - initialPosition; + return batchHeaderSize + writtenUncompressed; } else { // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes - return (int) (writtenUncompressed * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR); + return batchHeaderSize + (int) (writtenUncompressed * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR); } } From 883c5934cfd8f94882c578b521b63ba45e0eaddd Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 20 Jun 2017 13:00:00 -0700 Subject: [PATCH 3/3] Add comment on `writtenUncompressed` --- .../org/apache/kafka/common/record/MemoryRecordsBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 5fe1e7d1d1fc3..68b0bf045aa68 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -64,7 +64,7 @@ public class MemoryRecordsBuilder { private long producerId; private short producerEpoch; private int baseSequence; - private int writtenUncompressed = 0; + private int writtenUncompressed = 0; // Number of bytes (excluding the header) written before compression private int batchHeaderSize; private int numRecords = 0; private float actualCompressionRatio = 1;