diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java index f7ded0c4d7d2..d39dff060c9a 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java @@ -162,6 +162,25 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue(); } + @TestTemplate + public void testCopyOnWriteDeleteSetsSortOrderIdOnRewrittenDataFiles() { + createAndInitTable( + "id INT, dep STRING", + "PARTITIONED BY (dep)", + "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }"); + + sql("ALTER TABLE %s WRITE ORDERED BY id", tableName); + + sql("DELETE FROM %s WHERE id = 1", commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch); + assertThat(snapshot.addedDataFiles(table.io())) + .extracting(DataFile::sortOrderId) + .as("Rewritten data files should carry the table sort order id") + .containsOnly(table.sortOrder().orderId()); + } + @TestTemplate public void testRuntimeFilteringWithPreservedDataGrouping() throws NoSuchTableException { createAndInitPartitionedTable(); diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java index 8d509c2952a8..03d5b4ca5bdf 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java @@ -147,6 +147,34 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue(); } + @TestTemplate + public void testCopyOnWriteMergeSetsSortOrderIdOnRewrittenDataFiles() { + createAndInitTable("id INT, dep STRING"); + sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); + sql("ALTER TABLE %s WRITE ORDERED BY id", tableName); + + append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }"); + createBranchIfNeeded(); + + createOrReplaceView("source", Collections.singletonList(1), Encoders.INT()); + + sql( + "MERGE INTO %s t USING source s " + + "ON t.id == s.value " + + "WHEN MATCHED THEN " + + " UPDATE SET dep = 'changed' " + + "WHEN NOT MATCHED THEN " + + " INSERT (id, dep) VALUES (s.value, 'new')", + commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch); + assertThat(snapshot.addedDataFiles(table.io())) + .extracting(DataFile::sortOrderId) + .as("Rewritten data files should carry the table sort order id") + .containsOnly(table.sortOrder().orderId()); + } + @TestTemplate public void testRuntimeFilteringWithReportedPartitioning() { createAndInitTable("id INT, dep STRING"); diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java index 21d1377b2b98..b547218acbd4 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java @@ -149,6 +149,25 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue(); } + @TestTemplate + public void testCopyOnWriteUpdateSetsSortOrderIdOnRewrittenDataFiles() { + createAndInitTable( + "id INT, dep STRING", + "PARTITIONED BY (dep)", + "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }"); + + sql("ALTER TABLE %s WRITE ORDERED BY id", tableName); + + sql("UPDATE %s SET dep = 'changed' WHERE id = 1", commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch); + assertThat(snapshot.addedDataFiles(table.io())) + .extracting(DataFile::sortOrderId) + .as("Rewritten data files should carry the table sort order id") + .containsOnly(table.sortOrder().orderId()); + } + @TestTemplate public void testRuntimeFilteringWithReportedPartitioning() { createAndInitTable("id INT, dep STRING"); diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java index 361faade7e37..dcd9b9575786 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.ParameterizedTestExtension; @@ -136,6 +137,34 @@ public void testMergeWithDVAndHistoricalPositionDeletes() { assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == FileFormat.PUFFIN); } + @TestTemplate + public void testMergeOnReadMergeSetsSortOrderIdOnNewDataFiles() { + createAndInitTable( + "id INT, dep STRING", + "PARTITIONED BY (dep)", + "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }"); + + sql("ALTER TABLE %s WRITE ORDERED BY id", tableName); + + createOrReplaceView("source", ImmutableList.of(1, 3), Encoders.INT()); + + sql( + "MERGE INTO %s AS t USING source AS s " + + "ON t.id == s.value " + + "WHEN MATCHED THEN " + + " UPDATE SET id = id + 10 " + + "WHEN NOT MATCHED THEN " + + " INSERT (id, dep) VALUES (s.value, 'hr')", + commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch); + assertThat(snapshot.addedDataFiles(table.io())) + .extracting(DataFile::sortOrderId) + .as("All new data files should carry the table sort order id") + .containsOnly(table.sortOrder().orderId()); + } + private void checkMergeDeleteGranularity(DeleteGranularity deleteGranularity) { createTableWithDeleteGranularity( "id INT, dep STRING", "PARTITIONED BY (dep)", deleteGranularity); diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java index a876e6d66b93..599f39c8a247 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.ParameterizedTestExtension; @@ -197,6 +198,25 @@ public void testUpdateWithDVAndHistoricalPositionDeletes() { assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == FileFormat.PUFFIN); } + @TestTemplate + public void testMergeOnReadUpdateSetsSortOrderIdOnNewDataFiles() { + createAndInitTable( + "id INT, dep STRING", + "PARTITIONED BY (dep)", + "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }"); + + sql("ALTER TABLE %s WRITE ORDERED BY id", tableName); + + sql("UPDATE %s SET id = id + 10 WHERE id = 1", commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch); + assertThat(snapshot.addedDataFiles(table.io())) + .extracting(DataFile::sortOrderId) + .as("All new data files should carry the table sort order id") + .containsOnly(table.sortOrder().orderId()); + } + private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) { initTable("PARTITIONED BY (dep)", deleteGranularity); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index a13cff6e99a5..6f2a4781c519 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -41,6 +41,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.IsolationLevel; import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableUtil; @@ -157,6 +158,25 @@ public int outputSpecId() { return outputSpecId; } + public int outputSortOrderId(SparkWriteRequirements writeRequirements) { + Integer explicitId = + confParser.intConf().option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID).parseOptional(); + + if (explicitId != null) { + Preconditions.checkArgument( + table.sortOrders().containsKey(explicitId), + "Cannot use output sort order id %s because the table does not contain a sort order with that id", + explicitId); + return explicitId; + } + + if (writeRequirements.hasOrdering()) { + return table.sortOrder().orderId(); + } + + return SortOrder.unsorted().orderId(); + } + public FileFormat dataFileFormat() { String valueAsString = confParser diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 391cb6bae3bf..1daecb523bc7 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -59,6 +59,7 @@ private SparkWriteOptions() {} "handle-timestamp-without-timezone"; public static final String OUTPUT_SPEC_ID = "output-spec-id"; + public static final String OUTPUT_SORT_ORDER_ID = "output-sort-order-id"; public static final String OVERWRITE_MODE = "overwrite-mode"; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java index b1c5a5c0901a..346abaee5e63 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java @@ -47,10 +47,14 @@ import org.apache.spark.sql.connector.expressions.SortOrder; import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering; import org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Option; abstract class SparkShufflingFileRewriteRunner extends SparkDataFileRewriteRunner { + private static final Logger LOG = LoggerFactory.getLogger(SparkShufflingFileRewriteRunner.class); + /** * The number of shuffle partitions to use for each output file. By default, this file rewriter * assumes each shuffle partition would become a separate output file. Attempting to generate @@ -119,6 +123,17 @@ public void doRewrite(String groupId, RewriteFileGroup fileGroup) { spec(fileGroup.outputSpecId()), fileGroup.expectedOutputFiles())); + org.apache.iceberg.SortOrder sortOrderInJobSpec = sortOrder(); + + org.apache.iceberg.SortOrder maybeMatchingTableSortOrder = + SortOrderUtil.findTableSortOrder(table(), sortOrder()); + + if (sortOrderInJobSpec.isSorted() && maybeMatchingTableSortOrder.isUnsorted()) { + LOG.warn( + "Sort order specified for job {} doesn't match any table sort orders, rewritten files will not be marked as sorted in the manifest files", + Spark3Util.describe(sortOrderInJobSpec)); + } + sortedDF .write() .format("iceberg") @@ -126,6 +141,7 @@ public void doRewrite(String groupId, RewriteFileGroup fileGroup) { .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, fileGroup.maxOutputFileSize()) .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") .option(SparkWriteOptions.OUTPUT_SPEC_ID, fileGroup.outputSpecId()) + .option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID, maybeMatchingTableSortOrder.orderId()) .mode("append") .save(groupId); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index f1fd7b7ff972..885b67c6aec9 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -110,6 +110,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde private final String branch; private final Map extraSnapshotMetadata; private final SparkWriteRequirements writeRequirements; + private final int sortOrderId; private final Context context; private final Map writeProperties; @@ -135,6 +136,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.branch = writeConf.branch(); this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); this.writeRequirements = writeConf.positionDeltaRequirements(command); + this.sortOrderId = writeConf.outputSortOrderId(writeRequirements); this.context = new Context(dataSchema, writeConf, info, writeRequirements); this.writeProperties = writeConf.writeProperties(); } @@ -169,7 +171,8 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { broadcastRewritableDeletes(), command, context, - writeProperties); + writeProperties, + sortOrderId); } private Broadcast> broadcastRewritableDeletes() { @@ -379,18 +382,21 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Command command; private final Context context; private final Map writeProperties; + private final int sortOrderId; PositionDeltaWriteFactory( Broadcast tableBroadcast, Broadcast> rewritableDeletesBroadcast, Command command, Context context, - Map writeProperties) { + Map writeProperties, + int sortOrderId) { this.tableBroadcast = tableBroadcast; this.rewritableDeletesBroadcast = rewritableDeletesBroadcast; this.command = command; this.context = context; this.writeProperties = writeProperties; + this.sortOrderId = sortOrderId; } @Override @@ -417,6 +423,7 @@ public DeltaWriter createWriter(int partitionId, long taskId) { .deleteFileFormat(context.deleteFileFormat()) .positionDeleteSparkType(context.deleteSparkType()) .writeProperties(writeProperties) + .dataSortOrder(table.sortOrders().get(sortOrderId)) .build(); if (command == DELETE) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index e890680cf780..f1bbd949d5da 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -182,6 +182,7 @@ private WriterFactory createWriterFactory() { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + int sortOrderId = writeConf.outputSortOrderId(writeRequirements); return new WriterFactory( tableBroadcast, queryId, @@ -191,7 +192,8 @@ private WriterFactory createWriterFactory() { writeSchema, dsSchema, partitionedFanoutEnabled, - writeProperties); + writeProperties, + sortOrderId); } private void commitOperation(SnapshotUpdate operation, String description) { @@ -656,6 +658,7 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr private final boolean partitionedFanoutEnabled; private final String queryId; private final Map writeProperties; + private final int sortOrderId; protected WriterFactory( Broadcast
tableBroadcast, @@ -666,7 +669,8 @@ protected WriterFactory( Schema writeSchema, StructType dsSchema, boolean partitionedFanoutEnabled, - Map writeProperties) { + Map writeProperties, + int sortOrderId) { this.tableBroadcast = tableBroadcast; this.format = format; this.outputSpecId = outputSpecId; @@ -676,6 +680,7 @@ protected WriterFactory( this.partitionedFanoutEnabled = partitionedFanoutEnabled; this.queryId = queryId; this.writeProperties = writeProperties; + this.sortOrderId = sortOrderId; } @Override @@ -700,6 +705,7 @@ public DataWriter createWriter(int partitionId, long taskId, long e .dataSchema(writeSchema) .dataSparkType(dsSchema) .writeProperties(writeProperties) + .dataSortOrder(table.sortOrders().get(sortOrderId)) .build(); if (spec.isUnpartitioned()) { diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index ae7ed3af651e..4b261aaf8386 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -45,6 +45,7 @@ import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.time.Duration; @@ -489,6 +490,51 @@ public void testDeleteFileWriteConf() { } } + @TestTemplate + public void testSortOrderWriteConf() { + Table table = validationCatalog.loadTable(tableIdent); + + table.replaceSortOrder().asc("id").commit(); + + SparkWriteConf writeConf = + new SparkWriteConf( + spark, table, ImmutableMap.of(SparkWriteOptions.OUTPUT_SORT_ORDER_ID, "1")); + + assertThat(writeConf.outputSortOrderId(SparkWriteRequirements.EMPTY)) + .isEqualTo(table.sortOrder().orderId()); + } + + @TestTemplate + public void testSortOrderWriteConfWithInvalidId() { + Table table = validationCatalog.loadTable(tableIdent); + + table.replaceSortOrder().asc("id").commit(); + + SparkWriteConf writeConfForUnknownSortOrder = + new SparkWriteConf( + spark, table, ImmutableMap.of(SparkWriteOptions.OUTPUT_SORT_ORDER_ID, "999")); + + assertThatIllegalArgumentException() + .isThrownBy( + () -> writeConfForUnknownSortOrder.outputSortOrderId(SparkWriteRequirements.EMPTY)) + .withMessage( + "Cannot use output sort order id 999 because the table does not contain a sort order with that id"); + } + + @TestTemplate + public void testSortOrderWriteConfWithNoOption() { + Table table = validationCatalog.loadTable(tableIdent); + + table.replaceSortOrder().asc("id").commit(); + + SparkWriteConf writeConfNoOption = new SparkWriteConf(spark, table, ImmutableMap.of()); + + assertThat(writeConfNoOption.outputSortOrderId(writeConfNoOption.writeRequirements())) + .isEqualTo(table.sortOrder().orderId()); + + assertThat(writeConfNoOption.outputSortOrderId(SparkWriteRequirements.EMPTY)).isEqualTo(0); + } + private void testWriteProperties(List> propertiesSuite) { withSQLConf( propertiesSuite.get(0), diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 6abce5b24da0..d74d8a29f994 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1516,7 +1516,7 @@ public void testSortMultipleGroups() { } @TestTemplate - public void testSimpleSort() { + public void testSimpleSort() throws IOException { Table table = createTable(20); shouldHaveFiles(table, 20); table.replaceSortOrder().asc("c2").commit(); @@ -1544,10 +1544,11 @@ public void testSimpleSort() { shouldHaveACleanCache(table); shouldHaveMultipleFiles(table); shouldHaveLastCommitSorted(table, "c2"); + dataFilesSortOrderShouldMatchTableSortOrder(table); } @TestTemplate - public void testSortAfterPartitionChange() { + public void testSortAfterPartitionChange() throws IOException { Table table = createTable(20); shouldHaveFiles(table, 20); table.updateSpec().addField(Expressions.bucket("c1", 4)).commit(); @@ -1578,10 +1579,11 @@ public void testSortAfterPartitionChange() { shouldHaveACleanCache(table); shouldHaveMultipleFiles(table); shouldHaveLastCommitSorted(table, "c2"); + dataFilesSortOrderShouldMatchTableSortOrder(table); } @TestTemplate - public void testSortCustomSortOrder() { + public void testSortCustomSortOrder() throws IOException { Table table = createTable(20); shouldHaveLastCommitUnsorted(table, "c2"); shouldHaveFiles(table, 20); @@ -1607,10 +1609,11 @@ public void testSortCustomSortOrder() { shouldHaveACleanCache(table); shouldHaveMultipleFiles(table); shouldHaveLastCommitSorted(table, "c2"); + dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted()); } @TestTemplate - public void testSortCustomSortOrderRequiresRepartition() { + public void testSortCustomSortOrderRequiresRepartition() throws IOException { int partitions = 4; Table table = createTable(); writeRecords(20, SCALE, partitions); @@ -1646,10 +1649,40 @@ public void testSortCustomSortOrderRequiresRepartition() { shouldHaveMultipleFiles(table); shouldHaveLastCommitUnsorted(table, "c2"); shouldHaveLastCommitSorted(table, "c3"); + dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted()); } @TestTemplate - public void testAutoSortShuffleOutput() { + public void testSortPastTableSortOrderGetsAppliedToFiles() throws IOException { + Table table = createTable(1); + + table.replaceSortOrder().asc("c3").commit(); + SortOrder c3SortOrder = table.sortOrder(); + + table.replaceSortOrder().asc("c2").commit(); + + List originalData = currentData(); + + RewriteDataFiles.Result result = + basicRewrite(table) + .sort(SortOrder.builderFor(table.schema()).asc("c3").build()) + .option(SizeBasedFileRewritePlanner.REWRITE_ALL, "true") + .execute(); + + assertThat(result.rewriteResults()).as("Should have 1 fileGroups").hasSize(1); + + table.refresh(); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + dataFilesShouldHaveSortOrderIdMatching(table, c3SortOrder); + } + + @TestTemplate + public void testAutoSortShuffleOutput() throws IOException { Table table = createTable(20); shouldHaveLastCommitUnsorted(table, "c2"); shouldHaveFiles(table, 20); @@ -1684,6 +1717,7 @@ public void testAutoSortShuffleOutput() { shouldHaveACleanCache(table); shouldHaveMultipleFiles(table); shouldHaveLastCommitSorted(table, "c2"); + dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted()); } @TestTemplate @@ -2619,4 +2653,17 @@ public boolean matches(RewriteFileGroup argument) { return groupIDs.contains(argument.info().globalIndex()); } } + + private void dataFilesSortOrderShouldMatchTableSortOrder(Table table) throws IOException { + dataFilesShouldHaveSortOrderIdMatching(table, table.sortOrder()); + } + + private void dataFilesShouldHaveSortOrderIdMatching(Table table, SortOrder sortOrder) + throws IOException { + try (CloseableIterable files = table.newScan().planFiles()) { + assertThat(files) + .extracting(fileScanTask -> fileScanTask.file().sortOrderId()) + .containsOnly(sortOrder.orderId()); + } + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java index 439c4443b990..bf667956ec5c 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.when; import java.io.File; +import java.io.IOException; import java.net.InetAddress; import java.nio.file.Path; import java.util.List; @@ -35,6 +36,7 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.Parameter; @@ -43,10 +45,12 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -155,6 +159,7 @@ public void testBasicWrite() { assertThat(file.splitOffsets()).as("Split offsets not present").isNotNull(); } assertThat(file.recordCount()).as("Should have reported record count as 1").isEqualTo(1); + assertThat(file.sortOrderId()).isEqualTo(SortOrder.unsorted().orderId()); // TODO: append more metric info if (format.equals(FileFormat.PARQUET)) { assertThat(file.columnSizes()).as("Column sizes metric not present").isNotNull(); @@ -475,6 +480,116 @@ public void testViewsReturnRecentResults() { assertThat(actual2).hasSameSizeAs(expected2).isEqualTo(expected2); } + @TestTemplate + public void testWriteDataFilesInTableSortOrder() throws IOException { + File parent = temp.resolve(format.toString()).toFile(); + File location = new File(parent, "test"); + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + SortOrder sortOrder = SortOrder.builderFor(SCHEMA).asc("id").build(); + Table table = tables.create(SCHEMA, spec, sortOrder, ImmutableMap.of(), location.toString()); + + List expected = Lists.newArrayListWithCapacity(10); + for (int i = 0; i < 10; i++) { + expected.add(new SimpleRecord(i, "a")); + } + + Dataset df = spark.createDataFrame(expected, SimpleRecord.class); + + df.select("id", "data") + .write() + .format("iceberg") + .option(SparkWriteOptions.WRITE_FORMAT, format.toString()) + .mode(SaveMode.Append) + .save(location.toString()); + + Dataset result = spark.read().format("iceberg").load(location.toString()); + + List actual = + result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected); + + try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { + assertThat(fileScanTasks) + .extracting(task -> task.file().sortOrderId()) + .as("All DataFiles are written with the table sort order id") + .containsOnly(table.sortOrder().orderId()); + } + } + + @TestTemplate + public void testWriteDataFilesUnsortedTable() throws IOException { + File parent = temp.resolve(format.toString()).toFile(); + File location = new File(parent, "test"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + List expected = Lists.newArrayList(new SimpleRecord(1, "a")); + Dataset df = spark.createDataFrame(expected, SimpleRecord.class); + + df.select("id", "data") + .write() + .format("iceberg") + .option(SparkWriteOptions.WRITE_FORMAT, format.toString()) + .mode(SaveMode.Append) + .save(location.toString()); + + try (CloseableIterable tasks = table.newScan().planFiles()) { + assertThat(tasks) + .extracting(task -> task.file().sortOrderId()) + .as("All DataFiles should have unsorted sort order id") + .containsOnly(SortOrder.unsorted().orderId()); + } + } + + @TestTemplate + public void testWriteDataFilesAfterSortOrderChange() throws IOException { + File parent = temp.resolve(format.toString()).toFile(); + File location = new File(parent, "test"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + List records = Lists.newArrayList(new SimpleRecord(1, "a")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + + df.select("id", "data") + .write() + .format("iceberg") + .option(SparkWriteOptions.WRITE_FORMAT, format.toString()) + .mode(SaveMode.Append) + .save(location.toString()); + + table.refresh(); + int unsortedId = SortOrder.unsorted().orderId(); + + try (CloseableIterable tasks = table.newScan().planFiles()) { + assertThat(tasks).extracting(task -> task.file().sortOrderId()).containsOnly(unsortedId); + } + + table.replaceSortOrder().asc("id").commit(); + int sortedId = table.sortOrder().orderId(); + + df.select("id", "data") + .write() + .format("iceberg") + .option(SparkWriteOptions.WRITE_FORMAT, format.toString()) + .mode(SaveMode.Append) + .save(location.toString()); + + table.refresh(); + + try (CloseableIterable tasks = table.newScan().planFiles()) { + assertThat(tasks) + .extracting(task -> task.file().sortOrderId()) + .as("Should contain both unsorted and sorted files") + .containsOnly(unsortedId, sortedId); + } + } + public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType option) { File parent = temp.resolve(format.toString()).toFile(); File location = new File(parent, "test"); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java index a974b58a9714..fcc8acd56a71 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java @@ -29,10 +29,14 @@ import java.nio.file.Paths; import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.TestBase; import org.apache.iceberg.types.Types; @@ -262,6 +266,50 @@ public void testStreamingWriteCompleteModeWithProjection() throws Exception { } } + @Test + public void testStreamingWriteDataFilesInTableSortOrder() throws Exception { + File parent = temp.resolve("parquet").toFile(); + File location = new File(parent, "test-table"); + File checkpoint = new File(parent, "checkpoint"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); + SortOrder sortOrder = SortOrder.builderFor(SCHEMA).asc("id").build(); + Table table = tables.create(SCHEMA, spec, sortOrder, ImmutableMap.of(), location.toString()); + + MemoryStream inputStream = newMemoryStream(1, spark.sqlContext(), Encoders.INT()); + DataStreamWriter streamWriter = + inputStream + .toDF() + .selectExpr("value AS id", "CAST (value AS STRING) AS data") + .writeStream() + .outputMode("append") + .format("iceberg") + .option("checkpointLocation", checkpoint.toString()) + .option("path", location.toString()); + + try { + StreamingQuery query = streamWriter.start(); + List batch1 = Lists.newArrayList(1, 2); + send(batch1, inputStream); + query.processAllAvailable(); + query.stop(); + + table.refresh(); + + try (CloseableIterable tasks = table.newScan().planFiles()) { + assertThat(tasks) + .extracting(task -> task.file().sortOrderId()) + .as("All DataFiles are written with the table sort order id") + .containsOnly(table.sortOrder().orderId()); + } + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + @Test public void testStreamingWriteUpdateMode() throws Exception { File parent = temp.resolve("parquet").toFile();