From 3b542f0514e8a6c23460d3ccc1e4fcafe8e7b624 Mon Sep 17 00:00:00 2001 From: Rishi Reddy Bokka Date: Tue, 4 Nov 2025 20:07:40 +0000 Subject: [PATCH 1/2] Handle empty Iceberg table source in conversion --- .../iceberg/IcebergConversionSource.java | 24 ++++++- .../iceberg/TestIcebergConversionSource.java | 70 +++++++++++++++++++ 2 files changed, 92 insertions(+), 2 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index 7a777ddb1..cdb67c8fa 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -108,7 +108,8 @@ private FileIO initTableOps() { @Override public InternalTable getTable(Snapshot snapshot) { Table iceTable = getSourceTable(); - Schema iceSchema = iceTable.schemas().get(snapshot.schemaId()); + Schema iceSchema = + (snapshot != null) ? iceTable.schemas().get(snapshot.schemaId()) : iceTable.schema(); TableOperations iceOps = ((BaseTable) iceTable).operations(); IcebergSchemaExtractor schemaExtractor = IcebergSchemaExtractor.getInstance(); InternalSchema irSchema = schemaExtractor.fromIceberg(iceSchema); @@ -123,12 +124,19 @@ public InternalTable getTable(Snapshot snapshot) { irPartitionFields.size() > 0 ? DataLayoutStrategy.HIVE_STYLE_PARTITION : DataLayoutStrategy.FLAT; + + Instant latestCommitTime = + (snapshot != null) + ? Instant.ofEpochMilli(snapshot.timestampMillis()) + : Instant.ofEpochMilli( + ((BaseTable) iceTable).operations().current().lastUpdatedMillis()); + return InternalTable.builder() .tableFormat(TableFormat.ICEBERG) .basePath(iceTable.location()) .name(iceTable.name()) .partitioningFields(irPartitionFields) - .latestCommitTime(Instant.ofEpochMilli(snapshot.timestampMillis())) + .latestCommitTime(latestCommitTime) .readSchema(irSchema) .layoutStrategy(dataLayoutStrategy) .latestMetadataPath(iceOps.current().metadataFileLocation()) @@ -147,6 +155,18 @@ public InternalSnapshot getCurrentSnapshot() { Table iceTable = getSourceTable(); Snapshot currentSnapshot = iceTable.currentSnapshot(); + + if (currentSnapshot == null) { + // Handle empty table case - return snapshot with schema but no data files + InternalTable irTable = getTable(null); + return InternalSnapshot.builder() + .version("0") + .table(irTable) + .partitionedDataFiles(Collections.emptyList()) + .sourceIdentifier("0") + .build(); + } + InternalTable irTable = getTable(currentSnapshot); TableScan scan = diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java index ffe6a2177..0e1a01cc0 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.*; import java.io.IOException; +import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -116,6 +117,37 @@ void getTableTest(@TempDir Path workingDir) throws IOException { internalTable.getPartitioningFields().get(0).getTransformType()); } + @Test + void testGetTableWithoutSnapshot(@TempDir Path workingDir) throws IOException { + Table emptyTable = createTestCatalogTable(workingDir.toString()); + assertNull(emptyTable.currentSnapshot()); + + SourceTable sourceTableConfig = getPerTableConfig(emptyTable); + + IcebergConversionSource conversionSource = + sourceProvider.getConversionSourceInstance(sourceTableConfig); + + InternalTable internalTable = conversionSource.getTable(null); + assertNotNull(internalTable); + assertEquals(TableFormat.ICEBERG, internalTable.getTableFormat()); + assertEquals(emptyTable.location(), internalTable.getBasePath()); + assertEquals( + ((BaseTable) emptyTable).operations().current().lastUpdatedMillis(), + internalTable.getLatestCommitTime().toEpochMilli()); + + assertEquals( + emptyTable.schema().columns().size(), internalTable.getReadSchema().getFields().size()); + validateSchema(internalTable.getReadSchema(), emptyTable.schema()); + + assertEquals(1, internalTable.getPartitioningFields().size()); + InternalField partitionField = internalTable.getPartitioningFields().get(0).getSourceField(); + assertEquals("cs_sold_date_sk", partitionField.getName()); + assertEquals(7, partitionField.getFieldId()); + assertEquals( + PartitionTransformType.VALUE, + internalTable.getPartitioningFields().get(0).getTransformType()); + } + @Test public void testGetCurrentSnapshot(@TempDir Path workingDir) throws IOException { Table catalogSales = createTestTableWithData(workingDir.toString()); @@ -164,6 +196,44 @@ public void testGetCurrentSnapshot(@TempDir Path workingDir) throws IOException } } + @Test + void testGetCurrentSnapshotForEmptyTable(@TempDir Path workingDir) throws IOException { + Table emptyTable = createTestCatalogTable(workingDir.toString()); + assertNull(emptyTable.currentSnapshot()); + + SourceTable sourceTableConfig = getPerTableConfig(emptyTable); + + IcebergDataFileExtractor spyDataFileExtractor = spy(IcebergDataFileExtractor.builder().build()); + IcebergPartitionValueConverter spyPartitionConverter = + spy(IcebergPartitionValueConverter.getInstance()); + + IcebergConversionSource conversionSource = + IcebergConversionSource.builder() + .hadoopConf(hadoopConf) + .sourceTableConfig(sourceTableConfig) + .dataFileExtractor(spyDataFileExtractor) + .partitionConverter(spyPartitionConverter) + .build(); + + InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); + assertNotNull(internalSnapshot); + assertEquals("0", internalSnapshot.getVersion()); + assertEquals("0", internalSnapshot.getSourceIdentifier()); + assertTrue(internalSnapshot.getPartitionedDataFiles().isEmpty()); + + InternalTable internalTable = internalSnapshot.getTable(); + assertNotNull(internalTable); + assertEquals(emptyTable.location(), internalTable.getBasePath()); + assertEquals( + ((BaseTable) emptyTable).operations().current().lastUpdatedMillis(), + internalTable.getLatestCommitTime().toEpochMilli()); + + assertEquals(emptyTable.schema().columns().size(), internalTable.getReadSchema().getFields().size()); + + verify(spyPartitionConverter, never()).toXTable(any(), any(), any()); + verify(spyDataFileExtractor, never()).fromIceberg(any(), any(), any()); + } + @Test public void testGetTableChangeForCommit(@TempDir Path workingDir) throws IOException { Table catalogSales = createTestTableWithData(workingDir.toString()); From 1edb366cf5a51e4056f65179cfb8beb75afc247d Mon Sep 17 00:00:00 2001 From: Rishi Reddy Bokka Date: Tue, 4 Nov 2025 20:29:01 +0000 Subject: [PATCH 2/2] applied spotless --- .../iceberg/IcebergConversionSource.java | 20 +++++++++---------- .../iceberg/TestIcebergConversionSource.java | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index cdb67c8fa..325b50e9b 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -109,7 +109,7 @@ private FileIO initTableOps() { public InternalTable getTable(Snapshot snapshot) { Table iceTable = getSourceTable(); Schema iceSchema = - (snapshot != null) ? iceTable.schemas().get(snapshot.schemaId()) : iceTable.schema(); + (snapshot != null) ? iceTable.schemas().get(snapshot.schemaId()) : iceTable.schema(); TableOperations iceOps = ((BaseTable) iceTable).operations(); IcebergSchemaExtractor schemaExtractor = IcebergSchemaExtractor.getInstance(); InternalSchema irSchema = schemaExtractor.fromIceberg(iceSchema); @@ -126,10 +126,10 @@ public InternalTable getTable(Snapshot snapshot) { : DataLayoutStrategy.FLAT; Instant latestCommitTime = - (snapshot != null) - ? Instant.ofEpochMilli(snapshot.timestampMillis()) - : Instant.ofEpochMilli( - ((BaseTable) iceTable).operations().current().lastUpdatedMillis()); + (snapshot != null) + ? Instant.ofEpochMilli(snapshot.timestampMillis()) + : Instant.ofEpochMilli( + ((BaseTable) iceTable).operations().current().lastUpdatedMillis()); return InternalTable.builder() .tableFormat(TableFormat.ICEBERG) @@ -160,11 +160,11 @@ public InternalSnapshot getCurrentSnapshot() { // Handle empty table case - return snapshot with schema but no data files InternalTable irTable = getTable(null); return InternalSnapshot.builder() - .version("0") - .table(irTable) - .partitionedDataFiles(Collections.emptyList()) - .sourceIdentifier("0") - .build(); + .version("0") + .table(irTable) + .partitionedDataFiles(Collections.emptyList()) + .sourceIdentifier("0") + .build(); } InternalTable irTable = getTable(currentSnapshot); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java index 0e1a01cc0..d1bf70bb6 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.*; import java.io.IOException; -import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -228,7 +227,8 @@ void testGetCurrentSnapshotForEmptyTable(@TempDir Path workingDir) throws IOExce ((BaseTable) emptyTable).operations().current().lastUpdatedMillis(), internalTable.getLatestCommitTime().toEpochMilli()); - assertEquals(emptyTable.schema().columns().size(), internalTable.getReadSchema().getFields().size()); + assertEquals( + emptyTable.schema().columns().size(), internalTable.getReadSchema().getFields().size()); verify(spyPartitionConverter, never()).toXTable(any(), any(), any()); verify(spyDataFileExtractor, never()).fromIceberg(any(), any(), any());