Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,37 @@

package org.apache.fluss.record;

import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.memory.AbstractPagedOutputView;

import static org.apache.fluss.utils.Preconditions.checkArgument;
import java.io.IOException;
import java.util.Arrays;

/** A writer for {@link ChangeTypeVector}. */
public class ChangeTypeVectorWriter {

private final MemorySegment segment;
private final int capacity;
private final int startPosition;
private byte[] buffer;
private int recordsCount = 0;

public ChangeTypeVectorWriter(MemorySegment segment, int startPosition) {
checkArgument(segment.size() >= startPosition, "The start position is out of bound.");
this.segment = segment;
this.capacity = segment.size() - startPosition;
this.startPosition = startPosition;
public ChangeTypeVectorWriter() {
this.buffer = new byte[64];
}

public void writeChangeType(ChangeType changeType) {
if (recordsCount > capacity) {
// TODO: support AbstractPagedOutputView to have extendable capacity
throw new IllegalStateException("The change type vector is full.");
if (recordsCount >= buffer.length) {
buffer = Arrays.copyOf(buffer, buffer.length * 2);
}
segment.put(startPosition + recordsCount, changeType.toByteValue());
buffer[recordsCount] = changeType.toByteValue();
recordsCount++;
}

/**
* Writes all buffered change-type bytes to {@code outputView}. The view handles page-boundary
* crossing transparently.
*/
public void writeTo(AbstractPagedOutputView outputView) throws IOException {
outputView.write(buffer, 0, recordsCount);
}

public int sizeInBytes() {
return recordsCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private MemoryLogRecordsArrowBuilder(
"The size of first segment of pagedOutputView is too small, need at least "
+ headerSize
+ " bytes.");
this.changeTypeWriter = new ChangeTypeVectorWriter(firstSegment, headerSize);
this.changeTypeWriter = new ChangeTypeVectorWriter();
this.estimatedSizeInBytes = headerSize;
this.recordCount = 0;
this.statisticsCollector = statisticsCollector;
Expand Down Expand Up @@ -178,44 +178,32 @@ public MultiBytesView build() throws IOException {
recordCount = arrowWriter.getRecordsCount();
int changeTypeSize = changeTypeWriter.sizeInBytes();

// For V1+ with statistics, write everything sequentially to pagedOutputView:
// [header] [statistics] [changeTypes] [arrow data]
// This makes CRC computation zero-copy over contiguous memory segments.
if (magic >= LOG_MAGIC_VALUE_V1 && statisticsCollector != null && recordCount > 0) {
// Save changeType bytes before they get overwritten. The changeType data lives
// in firstSegment at offset headerSize, which is the same memory backing
// pagedOutputView — so writing statistics there would clobber it.
byte[] changeTypeBytes = new byte[changeTypeSize];
firstSegment.get(headerSize, changeTypeBytes, 0, changeTypeSize);

// Position pagedOutputView right after the header
pagedOutputView.setPosition(headerSize);
// Position pagedOutputView right after the header.
// setPosition() only works before any pages have been advanced, which is
// guaranteed here because we have not written to pagedOutputView yet.
pagedOutputView.setPosition(headerSize);

// Write statistics directly to pagedOutputView (no temp byte[])
// V1+: write statistics between header and change-type bytes.
if (magic >= LOG_MAGIC_VALUE_V1 && statisticsCollector != null && recordCount > 0) {
try {
statisticsBytesLength = statisticsCollector.writeStatistics(pagedOutputView);
} catch (Exception e) {
LOG.error("Failed to serialize statistics for record batch", e);
statisticsBytesLength = 0;
// Rewind to undo any partial writes from writeStatistics().
// This is safe because statistics data is typically small (a few hundred
// bytes) and the first page is usually 1MB+, so no page boundary has
// been crossed and setPosition() can rewind within the same page.
// Rewind to undo any partial write. Safe: statistics are small and
// never cross a page boundary on the first (typically 1 MB) page.
pagedOutputView.setPosition(headerSize);
}
}

// Write saved changeType bytes to pagedOutputView
pagedOutputView.write(changeTypeBytes);

// Write arrow data to pagedOutputView at current position.
// Use the no-position overload since pages may have advanced.
arrowWriter.serializeToOutputView(pagedOutputView);
} else {
// V0 path or no stats: layout is [header] [changeTypes] [arrow data]
// changeTypes are already in firstSegment at headerSize offset
arrowWriter.serializeToOutputView(pagedOutputView, headerSize + changeTypeSize);
// Write change-type bytes (growable buffer → transparent page crossing).
if (!appendOnly) {
changeTypeWriter.writeTo(pagedOutputView);
}

// Write Arrow data at the current pagedOutputView position.
arrowWriter.serializeToOutputView(pagedOutputView);

// Reset the statistics collector for reuse
if (statisticsCollector != null) {
statisticsCollector.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,39 @@ void testStatisticsWithDifferentChangeTypes() throws Exception {
readContext.close();
}

@Test
void testChangeTypeVectorOverflowsToMultiplePages() throws Exception {
// V0 header = 48 bytes; page size = 50 → 2 CT bytes fit on first page.
// 5 rows → 5 CT bytes → overflow to a second page.
int pageSizeInBytes = 50;
int maxSizeInBytes = 1024;
ArrowWriter writer =
provider.getOrCreateWriter(
1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, NO_COMPRESSION);

MemoryLogRecordsArrowBuilder builder =
MemoryLogRecordsArrowBuilder.builder(
DEFAULT_SCHEMA_ID,
writer,
new ManagedPagedOutputView(new TestingMemorySegmentPool(pageSizeInBytes)),
false,
null);

List<Object[]> appended = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Object[] row = DATA1.get(i % DATA1.size());
builder.append(ChangeType.INSERT, DataTestUtils.row(row));
appended.add(row);
}
builder.close();

MemoryLogRecords records = MemoryLogRecords.pointToBytesView(builder.build());

// Verify all rows and change types round-trip correctly
assertLogRecordsEquals(
DATA1_ROW_TYPE, records, appended, ChangeType.INSERT, TEST_SCHEMA_GETTER);
}

private static List<ArrowCompressionInfo> compressionInfos() {
return Arrays.asList(
new ArrowCompressionInfo(ArrowCompressionType.LZ4_FRAME, -1),
Expand Down
Loading