Skip to content

feat(flink): add lance format for Flink append only table#18741

Open
danny0405 wants to merge 3 commits into
apache:masterfrom
danny0405:lance-support
Open

feat(flink): add lance format for Flink append only table#18741
danny0405 wants to merge 3 commits into
apache:masterfrom
danny0405:lance-support

Conversation

@danny0405
Copy link
Copy Markdown
Contributor

Describe the issue this Pull Request addresses

Flink currently does not have an append-only Lance base-file path for tables without primary keys. Lance support needs Flink-specific RowData writer and reader plumbing, table-format validation, and COW input-format handling so pk-less append-only tables can ingest and read Lance base files.

This PR adds Lance support for Flink COPY_ON_WRITE append-only INSERT tables without primary keys. It explicitly rejects unsupported Lance combinations such as primary-key tables, MERGE_ON_READ tables, and non-INSERT write operations. This PR introduces user-visible storage-format support for Flink under those constraints.

Summary and Changelog

Flink can now write and read Lance base files for append-only COW tables without primary keys, including projected reads from Lance files.

Working tree: Add Flink Lance append-only writer/reader support

  • Added HoodieRowDataLanceWriter, HoodieFlinkLanceArrowUtils, and HoodieBloomFilterStringWriteSupport for primitive RowData-to-Arrow Lance writes.
  • Updated HoodieRowDataCreateHandle and HoodieRowDataFileWriterFactory to dispatch writers by base-file extension and create Lance writers.
  • Added HoodieRowDataLanceReader and wired it through HoodieRowDataFileReaderFactory, FlinkRowDataReaderContext, and CopyOnWriteInputFormat.
  • Updated Lance reader resource ownership so iterator close releases Arrow reader, Lance reader, allocator, and parent metadata reader resources.
  • Reordered Lance reader vectors by requested field name so projected reads like select name, uuid return columns in Flink projection order.
  • Marked Lance files unsplittable in CopyOnWriteInputFormat.
  • Added StreamerUtil.getLanceWriteConfig(...) and persisted hoodie.table.base.file.format during table initialization.
  • Updated HoodieTableFactory validation to allow Lance only for append-only COPY_ON_WRITE INSERT tables without primary keys.
  • Replaced the old Flink Lance rejection IT with ITTestHoodieDataSource#testLanceFormatAppendOnlyWriteAndRead.
  • Added catalog/table-factory tests for append-only Lance table creation and unsupported keyed, MOR, and non-INSERT Lance writes.

Impact

This enables a new Flink write/read path for Lance base files, scoped to COPY_ON_WRITE append-only tables without primary keys. Existing Parquet behavior is preserved, with writer creation now dispatched by file extension instead of always creating a Parquet writer.

Unsupported Lance table shapes fail early with explicit validation messages. Complex/nested logical types are not supported by the Flink Lance Arrow conversion helpers in this change; unsupported types throw HoodieNotSupportedException.

Risk Level

medium

This touches storage-format writer/reader paths, Flink table validation, COW input-format reading, and table initialization. The main risks are projection correctness, resource cleanup, and accidental enablement for unsupported table modes. Mitigation includes focused compile and integration/unit coverage:

  • mvn -pl hudi-flink-datasource/hudi-flink -am -DskipTests -DskipITs -Dscala-2.12 compile
  • mvn -pl hudi-flink-datasource/hudi-flink -am -Dscala-2.12 -Dtest=TestHoodieTableFactory#testLanceFormatSupportedForAppendOnlyTables,org.apache.hudi.table.catalog.TestHoodieCatalog#testCreateAppendOnlyLanceTableWithoutPrimaryKey,ITTestHoodieDataSource#testLanceFormatAppendOnlyWriteAndRead -Dsurefire.failIfNoSpecifiedTests=false test

The focused test run passed with Tests run: 3, Failures: 0, Errors: 0.

Documentation Update

Documentation update is recommended because this adds new user-facing Flink Lance support with important constraints: only COPY_ON_WRITE append-only INSERT tables without primary keys are supported, and primitive columns are supported by the current RowData/Arrow conversion path.

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions Bot added the size:XL PR with lines of changes > 1000 label May 15, 2026
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! The PR adds Flink Lance support for append-only COW tables and includes both writer/reader plumbing. A few correctness concerns worth double-checking in the inline comments — most notably the order of validation vs. setupConfOptions lets PRIMARY KEY DDL bypass the Lance constraint on the sink path, and a resource leak in the read context when iterator creation fails. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A few naming and simplification suggestions below.

String baseFileFormat = conf.getString(HoodieTableConfig.BASE_FILE_FORMAT.key(), null);
if (baseFileFormat != null && HoodieFileFormat.LANCE.name().equalsIgnoreCase(baseFileFormat)) {
throw new HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
if (conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 On the sink path, sanityCheck (and so checkBaseFileFormat) runs before setupConfOptionssetupHoodieKeyOptions, which is where PRIMARY KEY syntax is copied into FlinkOptions.RECORD_KEY_FIELD. So a Lance + operation=insert table declared with PRIMARY KEY (..) NOT ENFORCED passes this validation and only fails on the source side — leaving the user able to write but not read. Would it make sense to also check schema.getPrimaryKey().isPresent() here (and add a test using PRIMARY KEY syntax rather than the option)?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

(HoodieRowDataLanceReader) HoodieIOFactory.getIOFactory(storage)
.getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
.getFileReader(tableConfig, filePath, HoodieFileFormat.LANCE, Option.empty());
return rowDataLanceReader.getRowDataIterator(RowDataQueryContexts.fromSchema(requiredSchema).getRowType(), requiredSchema);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 If getRowDataIterator(...) throws (e.g. loadNextBatch failure surfaces as HoodieIOException), rowDataLanceReader is leaked — the metadata LanceFileReader and metadataAllocator from the constructor are never released, since the parent reader's close() is only chained through the iterator's close(). Could you wrap this in a try/catch that closes the reader on failure (the CopyOnWriteInputFormat.getLanceRecordIterator path already does this)?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

.mapToObj(i -> DataTypes.FIELD(fullFieldNames[i], fullFieldTypes[i]))
.toArray(DataTypes.Field[]::new))
.bridgedTo(RowData.class);
HoodieSchema requestedSchema = HoodieSchemaConverter.convertToSchema(selectedDataType.getLogicalType());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The Lance branch projects only selectedFields from the file and does not inject partition values from the path (which the Parquet branch does via partObjects). If hoodie.datasource.write.drop.partitioncolumns=true, the partition column won't be in the Lance file and orderVectors will throw Missing Lance column in projected batch: <partition> for any query that selects the partition column. Is this combination intended to be unsupported, or should we either inject the value here or block the config in checkBaseFileFormat?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not supported yet

try {
arrowReader.close();
} catch (Exception e) {
throw new HoodieException("Failed to close Lance Arrow reader", e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 If arrowReader.close() throws, the throw inside the outer finally for lanceReader.close() (or reader.close() inside the innermost finally) will replace it and the original exception is lost — finally-throws override exceptions from the try body. Could you switch to the addSuppressed pattern (e.g. accumulate a primary exception across all four closes)? The Spark equivalent uses log-and-swallow which avoids this entirely.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is necessary as long as the exception is thrown out.

batch = arrowReader.getVectorSchemaRoot();
orderedVectors = orderVectors(rowType, batch.getFieldVectors());
rowId = 0;
if (batch.getRowCount() == 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Recursing into loadNextBatch() for empty batches will blow the stack if a Lance file contains many consecutive empty batches in a row. Could you convert this to an iterative loop (do { … } while (hasNext && batch.getRowCount() == 0))?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

ArrowType.Decimal decimal = (ArrowType.Decimal) arrowType;
return new DecimalType(decimal.getPrecision(), decimal.getScale());
} else if (arrowType instanceof ArrowType.Timestamp) {
return new org.apache.flink.table.types.logical.TimestampType(6);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 toLogicalType maps every ArrowType.Timestamp to TimestampType(6), even when the Arrow timezone is "UTC" (which the writer uses for TIMESTAMP_WITH_LOCAL_TIME_ZONE). Is that intentional, or should the timezone-set case map to LocalZonedTimestampType(6) so schema round-trips? Today HoodieRowDataLanceReader.getSchema() would silently lose the local-zone-ness.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

return getFileWriterByFormat(extension, instantTime, storagePath, config, rowType, taskContextSupplier);
}

private <T, I, K, O> HoodieFileWriter getFileWriterByFormat(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: <T, I, K, O> are declared on this method but none of them appear anywhere in the signature or body — could you drop them?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep it, might need to refactor it in the future.

/**
* Builds a Lance write config from storage options carried in the Hadoop configuration.
*/
public static HoodieConfig getLanceWriteConfig(org.apache.hadoop.conf.Configuration conf) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the method body only reads LANCE_READ_* config keys, so getLanceWriteConfig is a misleading name — getLanceReadConfig would match what it actually does.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

* Validate the base file format. Flink Lance support is scoped to append-only COW tables.
*/
private void checkBaseFileFormat(Configuration conf) {
private void checkBaseFileFormat(Configuration conf, boolean write) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: a bare boolean write makes call sites like checkBaseFileFormat(conf, false) opaque — could you split into two methods (checkBaseFileFormatForRead / checkBaseFileFormatForWrite) or at least name it isWritePath?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

return new HoodieNotSupportedException("Flink Lance base-file support currently supports primitive append-only columns; unsupported type: " + type);
}

private static final class LogicalTypeChecks {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: this private inner class is just a one-liner forwarding to org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision — a static import would eliminate the wrapper entirely.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 12.08791% with 320 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.00%. Comparing base (071b3f1) to head (1d0b0fd).

Files with missing lines Patch % Lines
...he/hudi/table/format/HoodieRowDataLanceReader.java 0.00% 121 Missing ⚠️
...udi/io/storage/row/HoodieFlinkLanceArrowUtils.java 14.04% 94 Missing and 10 partials ⚠️
.../hudi/io/storage/row/HoodieRowDataLanceWriter.java 0.00% 41 Missing ⚠️
...io/storage/row/HoodieRowDataFileWriterFactory.java 16.66% 14 Missing and 1 partial ⚠️
.../hudi/table/format/cow/CopyOnWriteInputFormat.java 37.50% 13 Missing and 2 partials ⚠️
...e/hudi/table/format/FlinkRowDataReaderContext.java 10.00% 8 Missing and 1 partial ⚠️
...c/main/java/org/apache/hudi/util/StreamerUtil.java 11.11% 8 Missing ⚠️
...orage/row/HoodieBloomFilterStringWriteSupport.java 0.00% 5 Missing ⚠️
...java/org/apache/hudi/table/HoodieTableFactory.java 92.30% 0 Missing and 1 partial ⚠️
...i/table/format/HoodieRowDataFileReaderFactory.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18741      +/-   ##
============================================
- Coverage     68.14%   68.00%   -0.14%     
- Complexity    29094    29115      +21     
============================================
  Files          2517     2521       +4     
  Lines        141113   141458     +345     
  Branches      17508    17558      +50     
============================================
+ Hits          96160    96199      +39     
- Misses        37046    37339     +293     
- Partials       7907     7920      +13     
Flag Coverage Δ
common-and-other-modules 44.33% <12.08%> (-0.08%) ⬇️
hadoop-mr-java-client 44.93% <ø> (-0.07%) ⬇️
spark-client-hadoop-common 48.32% <ø> (-0.01%) ⬇️
spark-java-tests 48.99% <ø> (+0.01%) ⬆️
spark-scala-tests 44.91% <ø> (-0.01%) ⬇️
utilities 37.60% <ø> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...hudi/io/storage/row/HoodieRowDataCreateHandle.java 86.77% <100.00%> (ø)
...java/org/apache/hudi/table/HoodieTableFactory.java 75.31% <92.30%> (+1.60%) ⬆️
...i/table/format/HoodieRowDataFileReaderFactory.java 75.00% <0.00%> (-25.00%) ⬇️
...orage/row/HoodieBloomFilterStringWriteSupport.java 0.00% <0.00%> (ø)
...c/main/java/org/apache/hudi/util/StreamerUtil.java 66.84% <11.11%> (-1.35%) ⬇️
...e/hudi/table/format/FlinkRowDataReaderContext.java 72.50% <10.00%> (-9.70%) ⬇️
...io/storage/row/HoodieRowDataFileWriterFactory.java 66.66% <16.66%> (-33.34%) ⬇️
.../hudi/table/format/cow/CopyOnWriteInputFormat.java 43.29% <37.50%> (-4.36%) ⬇️
.../hudi/io/storage/row/HoodieRowDataLanceWriter.java 0.00% <0.00%> (ø)
...udi/io/storage/row/HoodieFlinkLanceArrowUtils.java 14.04% <14.04%> (ø)
... and 1 more

... and 17 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working through the feedback — the rename to getLanceReadConfig, the split into checkBaseFileFormatForRead/checkBaseFileFormatForWrite, the do/while conversion in loadNextBatch, the iterator-creation cleanup in FlinkRowDataReaderContext, the timezone-aware timestamp mapping (with a round-trip test), and the static-import cleanup all look good. One prior finding still looks unaddressed: the PRIMARY KEY DDL bypass on the sink path in HoodieTableFactory. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here.

String baseFileFormat = conf.getString(HoodieTableConfig.BASE_FILE_FORMAT.key(), null);
if (baseFileFormat != null && HoodieFileFormat.LANCE.name().equalsIgnoreCase(baseFileFormat)) {
throw new HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
if (conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Re-raising — on the sink path, sanityCheck (line 107) still runs before setupConfOptions (line 108), and checkBaseFileFormatForWrite only looks at conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key()). A Lance + operation=insert table declared with PRIMARY KEY (..) NOT ENFORCED (no record.key.field option) will still pass this validation and only fail on read. Could you also check schema.getPrimaryKey().isPresent() here (e.g. by passing the schema through) and add a test using SchemaBuilder.primaryKey(...) rather than just the option?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XL PR with lines of changes > 1000

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants