Skip to content

[format] Add row-oriented file format with O(1) row-number lookups#7934

Merged
JingsongLi merged 6 commits into
apache:masterfrom
JingsongLi:row_format
May 22, 2026
Merged

[format] Add row-oriented file format with O(1) row-number lookups#7934
JingsongLi merged 6 commits into
apache:masterfrom
JingsongLi:row_format

Conversation

@JingsongLi
Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi commented May 22, 2026

Introduce a new .row file format optimized for fast point lookups by row number, designed for data evolution table. The format stores data in ZSTD-compressed blocks with a block index enabling binary search by row number.

Key components:

  • RowFormatWriter/Reader: block-level write and read with projection and selection (RoaringBitmap) pushdown
  • BlockPrefetcher: concurrent IO with range coalescing (merges adjacent blocks within 256KB gap, up to 2MB per range) and prefetch sliding window
  • InputStreamPool: lazy stream pool that opens streams on demand for concurrent reads
  • RowBlockWriter/Reader: compact row serialization supporting all Paimon types including nested ARRAY, MAP, ROW, and VARIANT
  • RowBlockIndex: delta+zigzag+varint encoded block metadata
  • Documentation: rowformat.md specification and fileformat.md updates

Introduce a new .row file format optimized for fast point lookups by row
number, designed for deletion vector applications and changelog
materialization. The format stores data in ZSTD-compressed blocks with a
block index enabling binary search by row number.

Key components:
- RowFormatWriter/Reader: block-level write and read with projection and
  selection (RoaringBitmap) pushdown
- BlockPrefetcher: concurrent IO with range coalescing (merges adjacent
  blocks within 256KB gap, up to 2MB per range) and prefetch sliding window
- InputStreamPool: lazy stream pool that opens streams on demand for
  concurrent reads
- RowBlockWriter/Reader: compact row serialization supporting all Paimon
  types including nested ARRAY, MAP, ROW, and VARIANT
- RowBlockIndex: delta+zigzag+varint encoded block metadata
- Documentation: rowformat.md specification and fileformat.md updates

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@leaves12138
Copy link
Copy Markdown
Contributor

Thanks for the PR. I found one correctness blocker and a couple of contract/resource issues in the current head (46f7352).

  1. Nested ROW projection can return wrong data.

    RowFileFormat.createReaderFactory() only computes a top-level field-id mapping and RowFileRecordIterator applies it with ProjectedRow, which also only projects top-level fields. This does not preserve nested projection semantics. For example, if the stored schema is r ROW<a INT, b INT> and the projected schema is r ROW<b INT>, reading the projected row currently returns a as the first nested field instead of b.

    I reproduced it with a small contract test: write GenericRow.of(GenericRow.of(1, 100)) with schema ROW<a,b>, read with projected schema ROW<b>, and row.getRow(0, 1).getInt(0) returns 1 instead of 100.

    This is risky for schema evolution / nested field pruning. Please either implement nested projection correctly in the row format reader, or make sure this format reads the full nested type and lets the existing schema-evolution mapping layer apply the nested projection.

  2. validateDataFields() currently lets unsupported types pass.

    The method is empty and says all Paimon types are supported, but RowBlockWriter / RowBlockReader do not handle at least BLOB and VECTOR. As a result, table/schema validation can succeed and the failure is deferred to write/read time. I reproduced this with VECTOR: format.validateDataFields(new RowType(... VECTOR ...)) does not throw.

    Please either add real support for these type roots or reject unsupported types recursively in validateDataFields(), and add regression coverage.

  3. The prefetcher uses an unbounded static cached thread pool.

    BlockPrefetcher uses Executors.newCachedThreadPool(). Each reader only prefetches a few ranges, but with many concurrent splits this can still create an unbounded number of threads. Also, when a reader is closed while prefetch tasks are in flight, InputStreamPool.close() only closes streams currently in the available queue; streams borrowed by cancelled/in-flight tasks may be returned after close and not closed.

    Please consider using a bounded daemon executor (or an executor owned/configured by the reader/runtime) and make InputStreamPool close-safe for streams returned after close.

The existing new tests pass for me:

mvn -pl paimon-format -DskipITs -Dcheckstyle.skip -Drat.skip=true -Dspotless.check.skip=true -Dtest='org.apache.paimon.format.row.*Test' test

But the two temporary contract tests above expose the nested projection and validation issues.

@leaves12138
Copy link
Copy Markdown
Contributor

I re-reviewed the latest head (0ca020c63c05) and found one remaining correctness blocker around nested projection through collection types.

NestedProjectedRow now handles nested ROW projection when the projected field itself is a ROW, but getArray() and getMap() still return the underlying values as-is. At the same time, FormatReaderMapping.pruneDataType() can recursively prune through ARRAY and MAP, so the row format can be asked to read a projected type such as ARRAY<ROW<b INT>> from stored data ARRAY<ROW<a INT, b INT>>.

A minimal reproducer is:

RowType elementType = new RowType(Arrays.asList(
    new DataField(10, "a", new IntType()),
    new DataField(11, "b", new IntType())));
RowType dataSchema = new RowType(Collections.singletonList(
    new DataField(0, "arr", new ArrayType(elementType))));

RowType projectedElementType = new RowType(Collections.singletonList(
    new DataField(11, "b", new IntType())));
RowType projectedSchema = new RowType(Collections.singletonList(
    new DataField(0, "arr", new ArrayType(projectedElementType))));

// Write: arr = [ROW<a=1, b=100>]
// Read with projectedSchema:
InternalArray array = row.getArray(0);
assertThat(array.getRow(0, 1).getInt(0)).isEqualTo(100);

This currently returns 1 instead of 100, because the array element row is still the stored full row and no recursive projection is applied inside the array. The same problem should also apply to MAP values/keys if they contain pruned ROWs.

Could you either make nested projection preserve the same semantics through ARRAY/MAP, or avoid recursively pruning collection element/value types for this format until those projected wrappers are supported?

@leaves12138
Copy link
Copy Markdown
Contributor

Thanks for the update. The direct ARRAY<ROW<...>> and MAP<..., ROW<...>> cases now look covered, and the existing targeted tests pass for me. I found two remaining edge cases:

  1. Nested projection still does not propagate through nested collection levels. NestedProjectedRow.create only registers an array projection when the immediate array element type is ROW, and ProjectedInternalArray#getArray/#getMap delegates to the underlying array as-is. So a type like ARRAY<ARRAY<ROW<a INT, b INT>>> projected as ARRAY<ARRAY<ROW<b INT>>> still returns a when reading the inner row:
RowType elementType = new RowType(Arrays.asList(
    new DataField(10, "a", new IntType()),
    new DataField(11, "b", new IntType())));
RowType dataSchema = new RowType(Collections.singletonList(
    new DataField(0, "arr", new ArrayType(new ArrayType(elementType)))));

RowType projectedElementType = new RowType(Collections.singletonList(
    new DataField(11, "b", new IntType())));
RowType projectedSchema = new RowType(Collections.singletonList(
    new DataField(0, "arr", new ArrayType(new ArrayType(projectedElementType)))));

// arr = [[ROW<a=1, b=100>]]
assertThat(row.getArray(0).getArray(0).getRow(0, 1).getInt(0)).isEqualTo(100);

This currently returns 1. The same pattern should apply to MAP values/keys that contain nested collections of pruned rows, because FormatReaderMapping.pruneDataType() recurses through ARRAY and MAP at arbitrary depth.

  1. Projecting a MULTISET field can fail before reading. The branch for DataTypeRoot.MAP || DataTypeRoot.MULTISET casts the type to MapType, but MultisetType is not a MapType. For example, reading only the ms field from ROW<id INT, ms MULTISET<STRING>> throws:
java.lang.ClassCastException: org.apache.paimon.types.MultisetType cannot be cast to org.apache.paimon.types.MapType
    at org.apache.paimon.utils.NestedProjectedRow.create(NestedProjectedRow.java:158)
    at org.apache.paimon.format.row.RowFileFormat.createReaderFactory(RowFileFormat.java:55)

Could you make the collection projection logic recursive for nested ARRAY/MAP cases, and handle MULTISET separately (or just leave it as an unprojected map-like value when its element type is not being pruned)?

@leaves12138
Copy link
Copy Markdown
Contributor

Thanks for the update. I rechecked the latest head (06276b0a79bf). The existing row-format targeted tests pass, and the extra repros for nested collection projection (ARRAY<ARRAY<ROW<...>>>, MAP<..., ARRAY<ROW<...>>>) and MULTISET projection also pass now. No further comments from my side.

Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rechecked the latest head (06276b0a79bf). The existing row-format targeted tests pass, and the additional nested collection / MULTISET projection edge cases also pass. The previous correctness concerns are addressed.

@JingsongLi JingsongLi merged commit 9699471 into apache:master May 22, 2026
12 of 13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants