Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
Expand Down Expand Up @@ -829,11 +829,11 @@ private void flushCurrentBatch() {
}

/**
* Flushes the current batch if it is transactional or if it has passed the append linger time.
* Flushes the current batch if it is transactional, if it has passed the append linger time, or if it is full.
*/
private void maybeFlushCurrentBatch(long currentTimeMs) {
if (currentBatch != null) {
if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@majialoong and I were discussing the condition (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs. The correct version seems to be (currentTimeMs - currentBatch.appendTimeMs) >= appendLingerMs. Or we can remove it, since a lingerTimeoutTask already exists

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushCurrentBatch();
}
}
Expand Down Expand Up @@ -911,6 +911,24 @@ public void run() {
}
}

/**
* Completes the given event once all pending writes are completed.
*
* @param event The event to complete once all pending
* writes are completed.
*/
private void waitForPendingWrites(DeferredEvent event) {
if (currentBatch != null && currentBatch.builder.numRecords() > 0) {
currentBatch.deferredEvents.add(event);
} else {
if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) {
deferredEventQueue.add(coordinator.lastWrittenOffset(), DeferredEventCollection.of(log, event));
} else {
event.complete(null);
}
}
}

/**
* Appends records to the log and replay them to the state machine.
*
Expand Down Expand Up @@ -940,17 +958,8 @@ private void append(

if (records.isEmpty()) {
// If the records are empty, it was a read operation after all. In this case,
// the response can be returned directly iff there are no pending write operations;
// otherwise, the read needs to wait on the last write operation to be completed.
if (currentBatch != null && currentBatch.builder.numRecords() > 0) {
currentBatch.deferredEvents.add(event);
} else {
if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) {
deferredEventQueue.add(coordinator.lastWrittenOffset(), DeferredEventCollection.of(log, event));
} else {
event.complete(null);
}
}
// the response can be returned once any pending write operations complete.
waitForPendingWrites(event);
} else {
// If the records are not empty, first, they are applied to the state machine,
// second, they are appended to the opened batch.
Expand Down Expand Up @@ -984,27 +993,18 @@ private void append(
}

if (isAtomic) {
// Compute the estimated size of the records.
int estimatedSize = AbstractRecords.estimateSizeInBytes(
// Compute the size of the records.
int estimatedSizeUpperBound = AbstractRecords.estimateSizeInBytes(
currentBatch.builder.magic(),
compression.type(),
CompressionType.NONE,
recordsToAppend
);

// Check if the current batch has enough space. We check this before
// replaying the records in order to avoid having to revert back
// changes if the records do not fit within a batch.
if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
throw new RecordTooLargeException("Message batch size is " + estimatedSize +
" bytes in append to partition " + tp + " which exceeds the maximum " +
"configured size of " + currentBatch.maxBatchSize + ".");
}

if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
// Otherwise, we write the current batch, allocate a new one and re-verify
// whether the records fit in it.
// If flushing fails, we don't catch the exception in order to let
// the caller fail the current operation.
if (!currentBatch.builder.hasRoomFor(estimatedSizeUpperBound)) {
// Start a new batch when the total uncompressed data size would exceed
// the max batch size. We still allow atomic writes with an uncompressed size
// larger than the max batch size as long as they compress down to under the max
// batch size. These large writes go into a batch by themselves.
flushCurrentBatch();
maybeAllocateNewBatch(
producerId,
Expand Down Expand Up @@ -1075,8 +1075,8 @@ private void append(
// Add the event to the list of pending events associated with the batch.
currentBatch.deferredEvents.add(event);

// Write the current batch if it is transactional or if the linger timeout
// has expired.
// Write the current batch if it is transactional, if the linger timeout
// has expired, or if it is full.
// If flushing fails, we don't catch the exception in order to let
// the caller fail the current operation.
maybeFlushCurrentBatch(currentTimeMs);
Expand Down
Loading