diff --git a/fluss-common/src/main/java/org/apache/fluss/record/ChangeTypeVectorWriter.java b/fluss-common/src/main/java/org/apache/fluss/record/ChangeTypeVectorWriter.java index eab21e8083..244942a7fc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/ChangeTypeVectorWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/ChangeTypeVectorWriter.java @@ -17,34 +17,37 @@ package org.apache.fluss.record; -import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.memory.AbstractPagedOutputView; -import static org.apache.fluss.utils.Preconditions.checkArgument; +import java.io.IOException; +import java.util.Arrays; /** A writer for {@link ChangeTypeVector}. */ public class ChangeTypeVectorWriter { - private final MemorySegment segment; - private final int capacity; - private final int startPosition; + private byte[] buffer; private int recordsCount = 0; - public ChangeTypeVectorWriter(MemorySegment segment, int startPosition) { - checkArgument(segment.size() >= startPosition, "The start position is out of bound."); - this.segment = segment; - this.capacity = segment.size() - startPosition; - this.startPosition = startPosition; + public ChangeTypeVectorWriter() { + this.buffer = new byte[64]; } public void writeChangeType(ChangeType changeType) { - if (recordsCount > capacity) { - // TODO: support AbstractPagedOutputView to have extendable capacity - throw new IllegalStateException("The change type vector is full."); + if (recordsCount >= buffer.length) { + buffer = Arrays.copyOf(buffer, buffer.length * 2); } - segment.put(startPosition + recordsCount, changeType.toByteValue()); + buffer[recordsCount] = changeType.toByteValue(); recordsCount++; } + /** + * Writes all buffered change-type bytes to {@code outputView}. The view handles page-boundary + * crossing transparently. + */ + public void writeTo(AbstractPagedOutputView outputView) throws IOException { + outputView.write(buffer, 0, recordsCount); + } + public int sizeInBytes() { return recordsCount; } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java index 3276771d0b..a540eb2fe2 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java @@ -107,7 +107,7 @@ private MemoryLogRecordsArrowBuilder( "The size of first segment of pagedOutputView is too small, need at least " + headerSize + " bytes."); - this.changeTypeWriter = new ChangeTypeVectorWriter(firstSegment, headerSize); + this.changeTypeWriter = new ChangeTypeVectorWriter(); this.estimatedSizeInBytes = headerSize; this.recordCount = 0; this.statisticsCollector = statisticsCollector; @@ -178,44 +178,32 @@ public MultiBytesView build() throws IOException { recordCount = arrowWriter.getRecordsCount(); int changeTypeSize = changeTypeWriter.sizeInBytes(); - // For V1+ with statistics, write everything sequentially to pagedOutputView: - // [header] [statistics] [changeTypes] [arrow data] - // This makes CRC computation zero-copy over contiguous memory segments. - if (magic >= LOG_MAGIC_VALUE_V1 && statisticsCollector != null && recordCount > 0) { - // Save changeType bytes before they get overwritten. The changeType data lives - // in firstSegment at offset headerSize, which is the same memory backing - // pagedOutputView — so writing statistics there would clobber it. - byte[] changeTypeBytes = new byte[changeTypeSize]; - firstSegment.get(headerSize, changeTypeBytes, 0, changeTypeSize); - - // Position pagedOutputView right after the header - pagedOutputView.setPosition(headerSize); + // Position pagedOutputView right after the header. + // setPosition() only works before any pages have been advanced, which is + // guaranteed here because we have not written to pagedOutputView yet. + pagedOutputView.setPosition(headerSize); - // Write statistics directly to pagedOutputView (no temp byte[]) + // V1+: write statistics between header and change-type bytes. + if (magic >= LOG_MAGIC_VALUE_V1 && statisticsCollector != null && recordCount > 0) { try { statisticsBytesLength = statisticsCollector.writeStatistics(pagedOutputView); } catch (Exception e) { LOG.error("Failed to serialize statistics for record batch", e); statisticsBytesLength = 0; - // Rewind to undo any partial writes from writeStatistics(). - // This is safe because statistics data is typically small (a few hundred - // bytes) and the first page is usually 1MB+, so no page boundary has - // been crossed and setPosition() can rewind within the same page. + // Rewind to undo any partial write. Safe: statistics are small and + // never cross a page boundary on the first (typically 1 MB) page. pagedOutputView.setPosition(headerSize); } + } - // Write saved changeType bytes to pagedOutputView - pagedOutputView.write(changeTypeBytes); - - // Write arrow data to pagedOutputView at current position. - // Use the no-position overload since pages may have advanced. - arrowWriter.serializeToOutputView(pagedOutputView); - } else { - // V0 path or no stats: layout is [header] [changeTypes] [arrow data] - // changeTypes are already in firstSegment at headerSize offset - arrowWriter.serializeToOutputView(pagedOutputView, headerSize + changeTypeSize); + // Write change-type bytes (growable buffer → transparent page crossing). + if (!appendOnly) { + changeTypeWriter.writeTo(pagedOutputView); } + // Write Arrow data at the current pagedOutputView position. + arrowWriter.serializeToOutputView(pagedOutputView); + // Reset the statistics collector for reuse if (statisticsCollector != null) { statisticsCollector.reset(); diff --git a/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java index 63b3f52dbb..49ee958eb5 100644 --- a/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java @@ -648,6 +648,39 @@ void testStatisticsWithDifferentChangeTypes() throws Exception { readContext.close(); } + @Test + void testChangeTypeVectorOverflowsToMultiplePages() throws Exception { + // V0 header = 48 bytes; page size = 50 → 2 CT bytes fit on first page. + // 5 rows → 5 CT bytes → overflow to a second page. + int pageSizeInBytes = 50; + int maxSizeInBytes = 1024; + ArrowWriter writer = + provider.getOrCreateWriter( + 1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, NO_COMPRESSION); + + MemoryLogRecordsArrowBuilder builder = + MemoryLogRecordsArrowBuilder.builder( + DEFAULT_SCHEMA_ID, + writer, + new ManagedPagedOutputView(new TestingMemorySegmentPool(pageSizeInBytes)), + false, + null); + + List appended = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + Object[] row = DATA1.get(i % DATA1.size()); + builder.append(ChangeType.INSERT, DataTestUtils.row(row)); + appended.add(row); + } + builder.close(); + + MemoryLogRecords records = MemoryLogRecords.pointToBytesView(builder.build()); + + // Verify all rows and change types round-trip correctly + assertLogRecordsEquals( + DATA1_ROW_TYPE, records, appended, ChangeType.INSERT, TEST_SCHEMA_GETTER); + } + private static List compressionInfos() { return Arrays.asList( new ArrowCompressionInfo(ArrowCompressionType.LZ4_FRAME, -1),