Skip to content

feat: Added IcebergArrowInputSourceReader #19510

Open
Shekharrajak wants to merge 7 commits into
apache:masterfrom
Shekharrajak:feature/iceberg-arrow-reader
Open

feat: Added IcebergArrowInputSourceReader #19510
Shekharrajak wants to merge 7 commits into
apache:masterfrom
Shekharrajak:feature/iceberg-arrow-reader

Conversation

@Shekharrajak
Copy link
Copy Markdown
Contributor

Fixes #19498

Description

  • Added IcebergArrowInputSourceReader using iceberg-arrow vectorized API
  • Returns the live Table object so the Arrow reader can drive scan planning
  • Two new optional JSON properties on the input spec. useArrowReader=false keeps existing behaviour byte-for-byte; useArrowReader=true switches to the Arrow path. arrowBatchSize defaults to 1024.

Release note

Apache Iceberg ingestion now supports an opt-in vectorized reader path backed by iceberg-arrow. Enable by setting "useArrowReader": true on the iceberg input source. The Arrow path automatically applies V2 delete files (positional and equality), handles schema evolution, pushes column projection and predicates into the scan planner, and is 2x–3x faster than the existing path.
Future Iceberg spec features (V3 deletion vectors, row lineage, V4+) become available on Iceberg version bumps with no Druid code changes. Default remains the existing path; both coexist.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Benchmark :

| numRows  | numCols | icebergArrowInputSourceReader    | icebergInputSourceReader         | Speedup |
|         |         |  ms/op         throughput        |  ms/op         throughput        | (vs baseline) |
+---------+---------+----------------------------------+----------------------------------+---------+
| 100,000 |   5     |   17.70   5,651,074 rows/s       |   51.35   1,947,542 rows/s       |  2.90x  |
| 100,000 |  15     |   44.11   2,266,881 rows/s       |   95.81   1,043,680 rows/s       |  2.17x  |
| 500,000 |   5     |   74.92   6,673,450 rows/s       |  187.73   2,663,420 rows/s       |  2.51x  |
| 500,000 |  15     |  197.97   2,525,637 rows/s       |  420.94   1,187,817 rows/s       |  2.13x  |
================================================================================
Summary:
  Best speedup:    2.90x  (numRows=100,000, numCols=5)
  Worst speedup:   2.13x  (numRows=500,000, numCols=15)
  Geomean speedup: 2.41x

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Looking into benchmark module which will be used throughout the arrow and datafusion integration to quickly check the improvements.

…t line breaking, DateTimes.utc, Maps.newHashMapWithExpectedSize)
private final boolean useArrowReader;

@JsonProperty
private final int arrowBatchSize;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a batch of 1024 values in a contiguous buffer -> emit SIMD instructions that process 8-16 rows per CPU cycle.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decompression amortization -> batch-read helps in decompress once and consume many row, since parquet compressed pages .

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

ingestion time at 100k × 5

Read

Arrow implementation : 17ms
Current implementaiton : 53ms

Index + Persist (= total − read)

~330ms
Similar time on both implementation

total time

Arrow implementation: 347 ms
current implementation: 410 ms

Index+persist does substantially more work per row than read. So even though Arrow makes read ~3x faster, that gain is dwarfed when amortized over the much-larger indexing cost.

Benchmark added https://github.com/Shekharrajak/druid/pull/1/changes

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

looks like network error https://github.com/apache/druid/actions/runs/26333466916/job/77523245546?pr=19510 - please trigger the CI check again.

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 2
P2 1
P3 0
Total 3

Reviewed 7 of 7 changed files.


This is an automated review by Codex GPT-5.5

File temporaryDirectory
)
{
if (useArrowReader) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Arrow path bypasses residual FAIL handling

The useArrowReader branch returns an IcebergArrowInputSourceReader before retrieveIcebergDatafiles() runs, so it skips the residual detection that enforces residualFilterMode=FAIL. IcebergArrowInputSourceReaderTest documents that a non-partition equality filter returns all rows from the file, so a spec with useArrowReader=true and residualFilterMode=FAIL will silently ingest residual rows instead of rejecting the ingestion. Please run the same residual check for the Arrow scan or otherwise honor the fail mode before reading.

// Push column projection into the scan planner — only requested columns are read from disk.
final List<String> configuredDims = schema.getDimensionsSpec().getDimensionNames();
final String timestampColumn = schema.getTimestampSpec().getTimestampColumn();
if (!configuredDims.isEmpty()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Projection omits required non-dimension columns

When fixed dimensions are configured, this projection selects only those dimension names plus the timestamp column. Druid carries required transform inputs and aggregator source fields through InputRowSchema.getColumnsFilter(), not DimensionsSpec alone. For example, dimensions=[name] with an aggregator over value will scan only ts/name, so value is missing from the event and downstream metrics/transforms read null. Please drive projection from the full columnsFilter/required fields, or avoid pruning when those required columns cannot be represented safely.

File temporaryDirectory
)
{
if (useArrowReader) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Parallel ingestion bypasses the Arrow reader

This special case only affects direct reader() calls. The input source still reports itself as splittable, and Druid's parallel task runners call createSplits() and then withSplit(); withSplit() returns the delegate input source, so subtasks read raw files through the old delegate path instead of IcebergArrowInputSourceReader. With useArrowReader=true, behavior therefore changes based on maxNumConcurrentSubTasks and can lose the Arrow path's Iceberg semantics. Please make the Arrow mode non-splittable or return split-aware Iceberg input sources that preserve useArrowReader.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Iceberg source: adopt Iceberg's native Arrow reader stack for forward-compatibility with Iceberg spec evolution and performance improvements

2 participants