Delta: Updating Delta to Iceberg conversion - Inserts only#15407
Delta: Updating Delta to Iceberg conversion - Inserts only#15407vladislav-sidorovich wants to merge 11 commits intoapache:mainfrom
Conversation
anoopj
left a comment
There was a problem hiding this comment.
Thank you for the PR. Moving to the Delta kernel is a great improvement. Here is my initial feedback.
| "The table identifier cannot be null, please provide a valid table identifier for the new iceberg table"); | ||
| Preconditions.checkArgument( | ||
| deltaTableLocation != null, | ||
| "The delta lake table location cannot be null, please provide a valid location of the delta lake table to be snapshot"); |
There was a problem hiding this comment.
Nit: Replace with Delta Lake and Iceberg in the error messages.
| totalDataFiles = | ||
| convertEachDeltaVersion(initialDeltaVersion, latestDeltaVersion, transaction); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); |
There was a problem hiding this comment.
Replace with throw new UncheckedIOException(e); (I think there is a CI failure related to this)
| @@ -0,0 +1,50 @@ | |||
| # Delta Lake Golden Tables | |||
There was a problem hiding this comment.
There is a CI failure due to missing license header. May want to add this to .rat-excludes
| private long convertEachDeltaVersion( | ||
| long initialDeltaVersion, long latestDeltaVersion, Transaction transaction) | ||
| throws IOException { | ||
| LOG.info( |
There was a problem hiding this comment.
Did you mean to remove this line?
| } | ||
|
|
||
| /** | ||
| * Convert each dela log {@code ColumnarBatch} to Iceberg action and commit to the given {@code |
| appendFiles.commit(); | ||
| } | ||
|
|
||
| tagCurrentSnapshot(deltaVersion, commitTimestamp, transaction); |
There was a problem hiding this comment.
If dataFilesToAdd is empty, ie line 279 evaluates to false, this line might cause a NPE.
There was a problem hiding this comment.
Yes, you are right. It will be no snapshots for empty tables. I will handle this scenario and add a test for it.
| import io.delta.kernel.exceptions.TableNotFoundException; | ||
| import io.delta.kernel.internal.DeltaHistoryManager; | ||
| import io.delta.kernel.internal.DeltaLogActionUtils; | ||
| import io.delta.kernel.internal.SnapshotImpl; |
There was a problem hiding this comment.
We are using internal APIs of the kernel. This is fragile - can we refactor this to use the public APIs instead? Snapshot, Table etc. Or are we doing this because we are trying to preserve the table history during the conversion? I would try to avoid this as much as possible.
There was a problem hiding this comment.
No, there are no public API available for these purposes we need.
Yes, I want go through table history step by step, so we will have exactly the same granularity in the history.
At the same time it's quite safe to use an internal API because it's depends on the Delta protocol which is stable.
There was a problem hiding this comment.
The internal APIs can change or disappear without any notice. I would think hard about avoiding dependencies on internal APIs, including changing semantics. (e.g. not preserving all the history by default).
| while (rows.hasNext()) { | ||
| Row row = rows.next(); | ||
| if (DeltaLakeActionsTranslationUtil.isAdd(row)) { | ||
| AddFile addFile = DeltaLakeActionsTranslationUtil.toAdd(row); |
There was a problem hiding this comment.
Can we avoid the use of the internal AddFile class and read fields directly from the Row using ordinals defined by the scan file schema?
| public SnapshotDeltaLakeTable deltaLakeConfiguration(Configuration conf) { | ||
| deltaEngine = DefaultEngine.create(conf); | ||
| deltaLakeFileIO = new HadoopFileIO(conf); | ||
| deltaTable = (TableImpl) Table.forPath(deltaEngine, deltaTableLocation); |
There was a problem hiding this comment.
It's necessary because I use internal API below. getChanges API is available only in TableImpl but not in the Table interface.
| try (CloseableIterator<Row> rows = columnarBatch.getRows()) { | ||
| while (rows.hasNext()) { | ||
| Row row = rows.next(); | ||
| if (DeltaLakeActionsTranslationUtil.isAdd(row)) { |
There was a problem hiding this comment.
Shouldn't we do a fail fast if we encounter a remove? Otherwise in tables with DML, the conversion will cause silent duplicates.
There was a problem hiding this comment.
Not really. I do not expect this code to be merged/used before I will add handling of all other Delta Actions.
So, I will support of remove action quite soon and fail fast will not be needed here.
| latestDeltaVersion, | ||
| newTableIdentifier); | ||
|
|
||
| Schema icebergSchema = convertToIcebergSchema(initialDeltaSnapshot.getSchema()); |
There was a problem hiding this comment.
This can cause silent data corruption if the table has column mapping enabled. For context, Delta Lake supports three column mapping modes:
none: Parquet files use the same column names as the logical schema.name: Parquet files use physical names (UUIDs or opaque strings) that differ from the logical names The Delta log stores a mapping between them. This enables column renames without rewriting
data files.id: Similar to name mode but maps by field ID instead of name.
Here we are mapping the schema based on the logical names. This won't work when column mapping is enabled (name or id). Delta Lake's UniForm feature requires column mapping to be enabled (name or id mode) and carries the physical-to-logical name mapping through to Iceberg via Iceberg's own column mapping (field IDs).
For now, we should at least validate that column mapping is not enabled and fail fast:
Map<String, String> configuration = deltaSnapshot.getMetadata().getConfiguration();
String columnMappingMode = configuration.getOrDefault("delta.columnMapping.mode", "none");
if (!"none".equals(columnMappingMode)) {
throw new UnsupportedOperationException("...");
}
There was a problem hiding this comment.
Thank you for the input, it's very valuable. I will add it.
Current PRs contains initial version of the code to update of the existing functionality: https://iceberg.apache.org/docs/1.4.3/delta-lake-migration/ to the recent Delta Lake version (read: 3, write: 7). The motivation of the PR is to receive the earliest feedback from the community.
Note: The PR doesn't remove the old logic but adds new Interface implementation, so it will be easier to compare/review. Also base on the usage scenario of the module, such approach will not introduce any issues.
The PR scope:
Addaction)Future steps:
Tests:
Unit-tests: contains all supported datatypes including complex arrays and structures.
Integration-tests: contains inserts only scenario with Spark 3.5. The test must be updated for newer Delta Lake version once the previous solution will be deleted from the code.
In the following PRs, I will add all the tables from: Delta golden tables