feat: add lance format for Flink MOR table#18911
Conversation
| @@ -56,8 +56,8 @@ public enum HoodieFileFormat { | |||
| LANCE(".lance"); | |||
|
|
|||
| public static final String LANCE_SPARK_ONLY_ERROR_MSG = | |||
There was a problem hiding this comment.
Can we also rename the field name?
There was a problem hiding this comment.
Done. Renamed the constant to LANCE_UNSUPPORTED_ERROR_MSG and updated all call sites.
| } | ||
|
|
||
| protected ClosableIterator<RowData> getBaseFileIterator(String path) throws IOException { | ||
| if (path.endsWith(HoodieFileFormat.LANCE.getFileExtension())) { |
There was a problem hiding this comment.
Could we add a CDC coverage case for Lance MOR here? This PR adds Lance base-file handling for BASE_FILE_INSERT in both the Source V2 CDC reader and the legacy CDC path, but the new tests only cover snapshot reads. A CDC-enabled test that exercises a Lance MOR base-file CDC inference case would make this path much safer.
There was a problem hiding this comment.
Added CDC coverage by parameterizing TestInputFormat.testReadChangelogIncrementallyForMorWithCompaction over PARQUET and LANCE. The Lance case exercises MOR compaction with CDC enabled so the base-file CDC inference path reads Lance base files.
| /** Reads a parquet CDC base file returning required-schema records. */ | ||
| /** Reads a CDC base file returning required-schema records. */ | ||
| private ClosableIterator<RowData> getBaseFileIterator(String path) throws IOException { | ||
| if (path.endsWith(HoodieFileFormat.LANCE.getFileExtension())) { |
There was a problem hiding this comment.
The Lance base-file reader setup is now duplicated between MergeOnReadInputFormat and HoodieCdcSplitReaderFunction. Could we extract a small shared helper for building the selected DataType / requested HoodieSchema and opening HoodieRowDataLanceReader? That would reduce drift if schema, predicate, or close/error handling needs to change later.
There was a problem hiding this comment.
Done. Extracted the shared Lance RowData reader setup into FormatUtils.getLanceRecordIterator and reused it from MergeOnReadInputFormat, HoodieCdcSplitReaderFunction, and the existing COW Lance path so schema construction and close/error handling stay in one place.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18911 +/- ##
============================================
+ Coverage 68.81% 68.90% +0.09%
+ Complexity 29160 29110 -50
============================================
Files 2520 2517 -3
Lines 140056 139802 -254
Branches 17209 17204 -5
============================================
- Hits 96373 96329 -44
+ Misses 35909 35695 -214
- Partials 7774 7778 +4
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
This closes #18907 .
Flink Lance base-file support was previously scoped away from merge-on-read tables, so MOR write/read flows could not use Lance base files even when the Flink path had Lance-specific readers available. This blocked users from combining Lance base files with MOR log-file merging, CDC base-file reads, and async compaction in Flink SQL pipelines.
This PR expands the Flink Lance path to support merge-on-read writes and reads while keeping the existing schema-evolution restriction for Lance files.
Summary and Changelog
This PR enables Lance base files for Flink merge-on-read tables, wires Lance readers into MOR and CDC base-file reads, and updates tests to cover both Parquet and Lance MOR base/log-file reads.
Working tree: Support Flink Lance MOR write and read path
hoodie.table.base.file.format = LANCEfor Flink merge-on-read tables by removing the previous MOR rejection inHoodieTableFactory.MergeOnReadInputFormatusingHoodieRowDataLanceReaderand requested-schema projection.HoodieCdcSplitReaderFunctionso CDC split reads can load Lance base files.FlinkRowDataReaderContextschema-evolution rejection to only fail when a non-empty merge schema is required, while still rejecting actual Lance schema evolution.Working tree: Tests and validation
ITTestHoodieDataSource.testLanceFormatMergeOnReadUpsertWriteAndReadfor Flink SQL MOR upsert/write/read with Lance base files and async compaction enabled through SQL table options.TestInputFormat.testReadBaseAndLogFilesto run for bothPARQUETandLANCE.mvn -pl hudi-flink-datasource/hudi-flink -am -DskipITs -DskipIT -Dcheckstyle.skip -Drat.skip=true -DfailIfNoTests=false -Dsurefire.failIfNoSpecifiedTests=false -Dtest=TestInputFormat#testReadBaseAndLogFiles testmvn -pl hudi-flink-datasource/hudi-flink -am -DskipITs=false -DskipIT=false -Dcheckstyle.skip -Drat.skip=true -DfailIfNoTests=false -Dsurefire.failIfNoSpecifiedTests=false -Dtest=ITTestHoodieDataSource#testLanceFormatMergeOnReadUpsertWriteAndRead testImpact
This expands Flink user-facing behavior by allowing Lance base files with merge-on-read tables and by enabling MOR/CDC read paths to read Lance base files. Schema evolution for Flink Lance base files remains unsupported. There is no new public API, but the accepted configuration surface changes because Flink MOR tables can now use
hoodie.table.base.file.format = LANCE.Risk Level
medium
This touches Flink MOR read/write behavior, CDC split reads, table factory validation, and a storage-format-specific reader path. Risk is mitigated by targeted unit and integration coverage for Lance MOR SQL writes/reads, MOR base/log-file reads, and table factory validation. One targeted IT run completed successfully with Surefire retry after an initial transient row assertion mismatch.
Documentation Update
Required. The Flink/base-file-format support matrix or configuration documentation should be updated to note that Lance base files are supported for Flink merge-on-read tables, with schema evolution still unsupported.
Contributor's checklist