From cba2b0dff79c9f666a35e7436e546346f8b7a02c Mon Sep 17 00:00:00 2001 From: arifazmidd Date: Thu, 6 Nov 2025 13:44:25 -0500 Subject: [PATCH] Spark: Backport stream-results for remove orphan files to 3.4 and 4.0 --- .../actions/DeleteOrphanFilesSparkAction.java | 194 ++++++++++++------ .../RemoveOrphanFilesProcedure.java | 13 +- .../actions/TestRemoveOrphanFilesAction.java | 101 ++++++++- .../actions/DeleteOrphanFilesSparkAction.java | 194 ++++++++++++------ .../RemoveOrphanFilesProcedure.java | 13 +- .../actions/TestRemoveOrphanFilesAction.java | 101 ++++++++- 6 files changed, 488 insertions(+), 128 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 46bd67c9b2ec..78662159b0bb 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -91,18 +91,33 @@ * dataset provided which are not found in table metadata will be deleted, using the same {@link * Table#location()} and {@link #olderThan(long)} filtering as above. * + *

Streaming mode can be enabled via the {@value #STREAM_RESULTS} option to avoid loading all + * orphan file paths into driver memory. When enabled, the result will contain only a sample of file + * paths (up to {@value #MAX_ORPHAN_FILE_SAMPLE_SIZE_DEFAULT}). The total count of deleted files is + * logged but not included in the result. + * *

Note: It is dangerous to call this action with a short retention interval as it might * corrupt the state of the table if another operation is writing at the same time. */ public class DeleteOrphanFilesSparkAction extends BaseSparkAction implements DeleteOrphanFiles { + public static final String STREAM_RESULTS = "stream-results"; + public static final boolean STREAM_RESULTS_DEFAULT = false; + + // Test-only option to configure the max sample size for streaming mode + @VisibleForTesting + static final String MAX_ORPHAN_FILE_SAMPLE_SIZE = "max-orphan-file-sample-size"; + + private static final int MAX_ORPHAN_FILE_SAMPLE_SIZE_DEFAULT = 20000; + private static final Logger LOG = LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class); private static final Map EQUAL_SCHEMES_DEFAULT = ImmutableMap.of("s3n,s3a", "s3"); private static final int MAX_DRIVER_LISTING_DEPTH = 3; private static final int MAX_DRIVER_LISTING_DIRECT_SUB_DIRS = 10; private static final int MAX_EXECUTOR_LISTING_DEPTH = 2000; private static final int MAX_EXECUTOR_LISTING_DIRECT_SUB_DIRS = Integer.MAX_VALUE; + private static final int DELETE_GROUP_SIZE = 100000; private final SerializableConfiguration hadoopConf; private final int listingParallelism; @@ -234,7 +249,71 @@ private String jobDesc() { return String.format("Deleting orphan files (%s) from %s", optionsAsString, table.name()); } - private void deleteFiles(SupportsBulkOperations io, List paths) { + private boolean streamResults() { + return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT); + } + + private DeleteOrphanFiles.Result doExecute() { + Dataset actualFileIdentDS = actualFileIdentDS(); + Dataset validFileIdentDS = validFileIdentDS(); + + Dataset orphanFileDS = + findOrphanFiles(actualFileIdentDS, validFileIdentDS, prefixMismatchMode); + try { + return deleteFiles(orphanFileDS); + } finally { + orphanFileDS.unpersist(); + } + } + + /** + * Deletes orphan files from the cached dataset. + * + * @param orphanFileDS the cached dataset of orphan files + * @return result with orphan file paths + */ + private DeleteOrphanFiles.Result deleteFiles(Dataset orphanFileDS) { + int maxSampleSize = + PropertyUtil.propertyAsInt( + options(), MAX_ORPHAN_FILE_SAMPLE_SIZE, MAX_ORPHAN_FILE_SAMPLE_SIZE_DEFAULT); + List orphanFileList = Lists.newArrayListWithCapacity(maxSampleSize); + long filesCount = 0; + + Iterator orphanFiles = + streamResults() ? orphanFileDS.toLocalIterator() : orphanFileDS.collectAsList().iterator(); + + Iterator> fileGroups = Iterators.partition(orphanFiles, DELETE_GROUP_SIZE); + + while (fileGroups.hasNext()) { + List fileGroup = fileGroups.next(); + + collectPathsForOutput(fileGroup, orphanFileList, maxSampleSize); + + if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) { + deleteBulk((SupportsBulkOperations) table.io(), fileGroup); + } else { + deleteNonBulk(fileGroup); + } + + filesCount += fileGroup.size(); + } + + LOG.info("Deleted {} orphan files", filesCount); + + return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFileList).build(); + } + + private void collectPathsForOutput( + List paths, List orphanFileList, int maxSampleSize) { + if (streamResults()) { + int lengthToAdd = Math.min(maxSampleSize - orphanFileList.size(), paths.size()); + orphanFileList.addAll(paths.subList(0, lengthToAdd)); + } else { + orphanFileList.addAll(paths); + } + } + + private void deleteBulk(SupportsBulkOperations io, List paths) { try { io.deleteFiles(paths); LOG.info("Deleted {} files using bulk deletes", paths.size()); @@ -245,36 +324,65 @@ private void deleteFiles(SupportsBulkOperations io, List paths) { } } - private DeleteOrphanFiles.Result doExecute() { - Dataset actualFileIdentDS = actualFileIdentDS(); - Dataset validFileIdentDS = validFileIdentDS(); + private void deleteNonBulk(List paths) { + Tasks.Builder deleteTasks = + Tasks.foreach(paths) + .noRetry() + .executeWith(deleteExecutorService) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); + + if (deleteFunc == null) { + LOG.info( + "Table IO {} does not support bulk operations. Using non-bulk deletes.", + table.io().getClass().getName()); + deleteTasks.run(table.io()::deleteFile); + } else { + LOG.info("Custom delete function provided. Using non-bulk deletes"); + deleteTasks.run(deleteFunc::accept); + } + } - List orphanFiles = - findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode); + @VisibleForTesting + static Dataset findOrphanFiles( + Dataset actualFileIdentDS, + Dataset validFileIdentDS, + PrefixMismatchMode prefixMismatchMode) { - if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) { - deleteFiles((SupportsBulkOperations) table.io(), orphanFiles); - } else { + SetAccumulator> conflicts = new SetAccumulator<>(); + actualFileIdentDS.sparkSession().sparkContext().register(conflicts); - Tasks.Builder deleteTasks = - Tasks.foreach(orphanFiles) - .noRetry() - .executeWith(deleteExecutorService) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); - - if (deleteFunc == null) { - LOG.info( - "Table IO {} does not support bulk operations. Using non-bulk deletes.", - table.io().getClass().getName()); - deleteTasks.run(table.io()::deleteFile); - } else { - LOG.info("Custom delete function provided. Using non-bulk deletes"); - deleteTasks.run(deleteFunc::accept); + Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path")); + + Dataset orphanFileDS = + actualFileIdentDS + .joinWith(validFileIdentDS, joinCond, "leftouter") + .mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING()); + + // Cache and force computation to populate conflicts accumulator + orphanFileDS = orphanFileDS.cache(); + + try { + orphanFileDS.count(); + + if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) { + throw new ValidationException( + "Unable to determine whether certain files are orphan. Metadata references files that" + + " match listed/provided files except for authority/scheme. Please, inspect the" + + " conflicting authorities/schemes and provide which of them are equal by further" + + " configuring the action via equalSchemes() and equalAuthorities() methods. Set the" + + " prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting" + + " authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that" + + " remaining conflicting authorities/schemes are different. It will be impossible to" + + " recover deleted files. Conflicting authorities/schemes: %s.", + conflicts.value()); } - } - return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build(); + return orphanFileDS; + } catch (Exception e) { + orphanFileDS.unpersist(); + throw e; + } } private Dataset validFileIdentDS() { @@ -355,40 +463,6 @@ private Dataset listedFileDS() { } } - @VisibleForTesting - static List findOrphanFiles( - SparkSession spark, - Dataset actualFileIdentDS, - Dataset validFileIdentDS, - PrefixMismatchMode prefixMismatchMode) { - - SetAccumulator> conflicts = new SetAccumulator<>(); - spark.sparkContext().register(conflicts); - - Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path")); - - List orphanFiles = - actualFileIdentDS - .joinWith(validFileIdentDS, joinCond, "leftouter") - .mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING()) - .collectAsList(); - - if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) { - throw new ValidationException( - "Unable to determine whether certain files are orphan. " - + "Metadata references files that match listed/provided files except for authority/scheme. " - + "Please, inspect the conflicting authorities/schemes and provide which of them are equal " - + "by further configuring the action via equalSchemes() and equalAuthorities() methods. " - + "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting " - + "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting " - + "authorities/schemes are different. It will be impossible to recover deleted files. " - + "Conflicting authorities/schemes: %s.", - conflicts.value()); - } - - return orphanFiles; - } - private static Map flattenMap(Map map) { Map flattenedMap = Maps.newHashMap(); if (map != null) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index affc9b76d8c3..447f3295c003 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -71,6 +71,9 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { // List files with prefix operations. Default is false. private static final ProcedureParameter PREFIX_LISTING_PARAM = optionalInParameter("prefix_listing", DataTypes.BooleanType); + // Stream results to avoid loading all orphan files in driver memory. Default is false. + private static final ProcedureParameter STREAM_RESULTS_PARAM = + optionalInParameter("stream_results", DataTypes.BooleanType); private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { @@ -83,7 +86,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { EQUAL_SCHEMES_PARAM, EQUAL_AUTHORITIES_PARAM, PREFIX_MISMATCH_MODE_PARAM, - PREFIX_LISTING_PARAM + PREFIX_LISTING_PARAM, + STREAM_RESULTS_PARAM }; private static final StructType OUTPUT_TYPE = @@ -138,6 +142,7 @@ public InternalRow[] call(InternalRow args) { PrefixMismatchMode prefixMismatchMode = asPrefixMismatchMode(input, PREFIX_MISMATCH_MODE_PARAM); boolean prefixListing = input.asBoolean(PREFIX_LISTING_PARAM, false); + boolean streamResults = input.asBoolean(STREAM_RESULTS_PARAM, false); return withIcebergTable( tableIdent, @@ -163,7 +168,7 @@ public InternalRow[] call(InternalRow args) { if (maxConcurrentDeletes != null) { if (table.io() instanceof SupportsBulkOperations) { LOG.warn( - "max_concurrent_deletes only works with FileIOs that do not support bulk deletes. This" + "max_concurrent_deletes only works with FileIOs that do not support bulk deletes. This " + "table is currently using {} which supports bulk deletes so the parameter will be ignored. " + "See that IO's documentation to learn how to adjust parallelism for that particular " + "IO's bulk delete.", @@ -187,6 +192,10 @@ public InternalRow[] call(InternalRow args) { action.usePrefixListing(prefixListing); + if (streamResults) { + action.option("stream-results", "true"); + } + DeleteOrphanFiles.Result result = action.execute(); return toOutputRows(result); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index bda9c2eaa078..40505b856737 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.actions; +import static org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.MAX_ORPHAN_FILE_SAMPLE_SIZE; +import static org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.STREAM_RESULTS; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -202,8 +204,23 @@ public void testDryRun() throws IOException { .deleteOrphanFiles(table) .usePrefixListing(usePrefixListing) .olderThan(System.currentTimeMillis()) + .option(STREAM_RESULTS, "true") + .deleteWith(s -> {}) .execute(); assertThat(result3.orphanFileLocations()) + .as("Streaming dry run should find 1 file") + .isEqualTo(invalidFiles); + assertThat(fs.exists(new Path(invalidFiles.get(0)))) + .as("Invalid file should be present after streaming dry run") + .isTrue(); + + DeleteOrphanFiles.Result result4 = + actions + .deleteOrphanFiles(table) + .usePrefixListing(usePrefixListing) + .olderThan(System.currentTimeMillis()) + .execute(); + assertThat(result4.orphanFileLocations()) .as("Action should delete 1 file") .isEqualTo(invalidFiles); assertThat(fs.exists(new Path(invalidFiles.get(0)))) @@ -1203,9 +1220,89 @@ private void executeTest( Dataset validFileDS = spark.createDataset(validFiles, Encoders.STRING()); Dataset actualFileDS = spark.createDataset(actualFiles, Encoders.STRING()); - List orphanFiles = + Dataset orphanFileDS = DeleteOrphanFilesSparkAction.findOrphanFiles( - spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode); + toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode); + + List orphanFiles = orphanFileDS.collectAsList(); + orphanFileDS.unpersist(); + assertThat(orphanFiles).isEqualTo(expectedOrphanFiles); } + + @TestTemplate + public void testStreamResultsDeletion() throws IOException { + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + List validFiles = + spark + .read() + .format("iceberg") + .load(tableLocation + "#files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + assertThat(validFiles).as("Should be 1 valid file").hasSize(1); + + for (int i = 0; i < 10; i++) { + df.write().mode("append").parquet(tableLocation + "/data"); + } + + Path dataPath = new Path(tableLocation + "/data"); + FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); + List allFiles = + Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get())) + .filter(FileStatus::isFile) + .map(file -> file.getPath().toString()) + .collect(Collectors.toList()); + assertThat(allFiles).as("Should be 11 files").hasSize(11); + + List invalidFiles = Lists.newArrayList(allFiles); + invalidFiles.removeAll(validFiles); + assertThat(invalidFiles).as("Should be 10 invalid files").hasSize(10); + + waitUntilAfter(System.currentTimeMillis()); + + DeleteOrphanFiles.Result nonStreamingResult = + SparkActions.get() + .deleteOrphanFiles(table) + .usePrefixListing(usePrefixListing) + .olderThan(System.currentTimeMillis()) + .deleteWith(s -> {}) + .execute(); + + assertThat(nonStreamingResult.orphanFileLocations()) + .as("Non-streaming dry-run should return all 10 orphan files") + .hasSize(10) + .containsExactlyInAnyOrderElementsOf(invalidFiles); + + DeleteOrphanFiles.Result streamingResult = + SparkActions.get() + .deleteOrphanFiles(table) + .usePrefixListing(usePrefixListing) + .olderThan(System.currentTimeMillis()) + .option(STREAM_RESULTS, "true") + .option(MAX_ORPHAN_FILE_SAMPLE_SIZE, "5") + .execute(); + + assertThat(streamingResult.orphanFileLocations()) + .as("Streaming with sample size 5 should return only 5 orphan files") + .hasSize(5); + + for (String invalidFile : invalidFiles) { + assertThat(fs.exists(new Path(invalidFile))).as("Orphan file should be deleted").isFalse(); + } + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = + resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList(); + assertThat(actualRecords).isEqualTo(records); + } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 46bd67c9b2ec..78662159b0bb 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -91,18 +91,33 @@ * dataset provided which are not found in table metadata will be deleted, using the same {@link * Table#location()} and {@link #olderThan(long)} filtering as above. * + *

Streaming mode can be enabled via the {@value #STREAM_RESULTS} option to avoid loading all + * orphan file paths into driver memory. When enabled, the result will contain only a sample of file + * paths (up to {@value #MAX_ORPHAN_FILE_SAMPLE_SIZE_DEFAULT}). The total count of deleted files is + * logged but not included in the result. + * *

Note: It is dangerous to call this action with a short retention interval as it might * corrupt the state of the table if another operation is writing at the same time. */ public class DeleteOrphanFilesSparkAction extends BaseSparkAction implements DeleteOrphanFiles { + public static final String STREAM_RESULTS = "stream-results"; + public static final boolean STREAM_RESULTS_DEFAULT = false; + + // Test-only option to configure the max sample size for streaming mode + @VisibleForTesting + static final String MAX_ORPHAN_FILE_SAMPLE_SIZE = "max-orphan-file-sample-size"; + + private static final int MAX_ORPHAN_FILE_SAMPLE_SIZE_DEFAULT = 20000; + private static final Logger LOG = LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class); private static final Map EQUAL_SCHEMES_DEFAULT = ImmutableMap.of("s3n,s3a", "s3"); private static final int MAX_DRIVER_LISTING_DEPTH = 3; private static final int MAX_DRIVER_LISTING_DIRECT_SUB_DIRS = 10; private static final int MAX_EXECUTOR_LISTING_DEPTH = 2000; private static final int MAX_EXECUTOR_LISTING_DIRECT_SUB_DIRS = Integer.MAX_VALUE; + private static final int DELETE_GROUP_SIZE = 100000; private final SerializableConfiguration hadoopConf; private final int listingParallelism; @@ -234,7 +249,71 @@ private String jobDesc() { return String.format("Deleting orphan files (%s) from %s", optionsAsString, table.name()); } - private void deleteFiles(SupportsBulkOperations io, List paths) { + private boolean streamResults() { + return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT); + } + + private DeleteOrphanFiles.Result doExecute() { + Dataset actualFileIdentDS = actualFileIdentDS(); + Dataset validFileIdentDS = validFileIdentDS(); + + Dataset orphanFileDS = + findOrphanFiles(actualFileIdentDS, validFileIdentDS, prefixMismatchMode); + try { + return deleteFiles(orphanFileDS); + } finally { + orphanFileDS.unpersist(); + } + } + + /** + * Deletes orphan files from the cached dataset. + * + * @param orphanFileDS the cached dataset of orphan files + * @return result with orphan file paths + */ + private DeleteOrphanFiles.Result deleteFiles(Dataset orphanFileDS) { + int maxSampleSize = + PropertyUtil.propertyAsInt( + options(), MAX_ORPHAN_FILE_SAMPLE_SIZE, MAX_ORPHAN_FILE_SAMPLE_SIZE_DEFAULT); + List orphanFileList = Lists.newArrayListWithCapacity(maxSampleSize); + long filesCount = 0; + + Iterator orphanFiles = + streamResults() ? orphanFileDS.toLocalIterator() : orphanFileDS.collectAsList().iterator(); + + Iterator> fileGroups = Iterators.partition(orphanFiles, DELETE_GROUP_SIZE); + + while (fileGroups.hasNext()) { + List fileGroup = fileGroups.next(); + + collectPathsForOutput(fileGroup, orphanFileList, maxSampleSize); + + if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) { + deleteBulk((SupportsBulkOperations) table.io(), fileGroup); + } else { + deleteNonBulk(fileGroup); + } + + filesCount += fileGroup.size(); + } + + LOG.info("Deleted {} orphan files", filesCount); + + return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFileList).build(); + } + + private void collectPathsForOutput( + List paths, List orphanFileList, int maxSampleSize) { + if (streamResults()) { + int lengthToAdd = Math.min(maxSampleSize - orphanFileList.size(), paths.size()); + orphanFileList.addAll(paths.subList(0, lengthToAdd)); + } else { + orphanFileList.addAll(paths); + } + } + + private void deleteBulk(SupportsBulkOperations io, List paths) { try { io.deleteFiles(paths); LOG.info("Deleted {} files using bulk deletes", paths.size()); @@ -245,36 +324,65 @@ private void deleteFiles(SupportsBulkOperations io, List paths) { } } - private DeleteOrphanFiles.Result doExecute() { - Dataset actualFileIdentDS = actualFileIdentDS(); - Dataset validFileIdentDS = validFileIdentDS(); + private void deleteNonBulk(List paths) { + Tasks.Builder deleteTasks = + Tasks.foreach(paths) + .noRetry() + .executeWith(deleteExecutorService) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); + + if (deleteFunc == null) { + LOG.info( + "Table IO {} does not support bulk operations. Using non-bulk deletes.", + table.io().getClass().getName()); + deleteTasks.run(table.io()::deleteFile); + } else { + LOG.info("Custom delete function provided. Using non-bulk deletes"); + deleteTasks.run(deleteFunc::accept); + } + } - List orphanFiles = - findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode); + @VisibleForTesting + static Dataset findOrphanFiles( + Dataset actualFileIdentDS, + Dataset validFileIdentDS, + PrefixMismatchMode prefixMismatchMode) { - if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) { - deleteFiles((SupportsBulkOperations) table.io(), orphanFiles); - } else { + SetAccumulator> conflicts = new SetAccumulator<>(); + actualFileIdentDS.sparkSession().sparkContext().register(conflicts); - Tasks.Builder deleteTasks = - Tasks.foreach(orphanFiles) - .noRetry() - .executeWith(deleteExecutorService) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); - - if (deleteFunc == null) { - LOG.info( - "Table IO {} does not support bulk operations. Using non-bulk deletes.", - table.io().getClass().getName()); - deleteTasks.run(table.io()::deleteFile); - } else { - LOG.info("Custom delete function provided. Using non-bulk deletes"); - deleteTasks.run(deleteFunc::accept); + Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path")); + + Dataset orphanFileDS = + actualFileIdentDS + .joinWith(validFileIdentDS, joinCond, "leftouter") + .mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING()); + + // Cache and force computation to populate conflicts accumulator + orphanFileDS = orphanFileDS.cache(); + + try { + orphanFileDS.count(); + + if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) { + throw new ValidationException( + "Unable to determine whether certain files are orphan. Metadata references files that" + + " match listed/provided files except for authority/scheme. Please, inspect the" + + " conflicting authorities/schemes and provide which of them are equal by further" + + " configuring the action via equalSchemes() and equalAuthorities() methods. Set the" + + " prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting" + + " authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that" + + " remaining conflicting authorities/schemes are different. It will be impossible to" + + " recover deleted files. Conflicting authorities/schemes: %s.", + conflicts.value()); } - } - return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build(); + return orphanFileDS; + } catch (Exception e) { + orphanFileDS.unpersist(); + throw e; + } } private Dataset validFileIdentDS() { @@ -355,40 +463,6 @@ private Dataset listedFileDS() { } } - @VisibleForTesting - static List findOrphanFiles( - SparkSession spark, - Dataset actualFileIdentDS, - Dataset validFileIdentDS, - PrefixMismatchMode prefixMismatchMode) { - - SetAccumulator> conflicts = new SetAccumulator<>(); - spark.sparkContext().register(conflicts); - - Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path")); - - List orphanFiles = - actualFileIdentDS - .joinWith(validFileIdentDS, joinCond, "leftouter") - .mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING()) - .collectAsList(); - - if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) { - throw new ValidationException( - "Unable to determine whether certain files are orphan. " - + "Metadata references files that match listed/provided files except for authority/scheme. " - + "Please, inspect the conflicting authorities/schemes and provide which of them are equal " - + "by further configuring the action via equalSchemes() and equalAuthorities() methods. " - + "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting " - + "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting " - + "authorities/schemes are different. It will be impossible to recover deleted files. " - + "Conflicting authorities/schemes: %s.", - conflicts.value()); - } - - return orphanFiles; - } - private static Map flattenMap(Map map) { Map flattenedMap = Maps.newHashMap(); if (map != null) { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index f30f99978c45..89e218c10360 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -77,6 +77,9 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { // List files with prefix operations. Default is false. private static final ProcedureParameter PREFIX_LISTING_PARAM = optionalInParameter("prefix_listing", DataTypes.BooleanType); + // Stream results to avoid loading all orphan files in driver memory. Default is false. + private static final ProcedureParameter STREAM_RESULTS_PARAM = + optionalInParameter("stream_results", DataTypes.BooleanType); private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { @@ -89,7 +92,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { EQUAL_SCHEMES_PARAM, EQUAL_AUTHORITIES_PARAM, PREFIX_MISMATCH_MODE_PARAM, - PREFIX_LISTING_PARAM + PREFIX_LISTING_PARAM, + STREAM_RESULTS_PARAM }; private static final StructType OUTPUT_TYPE = @@ -144,6 +148,7 @@ public Iterator call(InternalRow args) { PrefixMismatchMode prefixMismatchMode = asPrefixMismatchMode(input, PREFIX_MISMATCH_MODE_PARAM); boolean prefixListing = input.asBoolean(PREFIX_LISTING_PARAM, false); + boolean streamResults = input.asBoolean(STREAM_RESULTS_PARAM, false); return withIcebergTable( tableIdent, @@ -169,7 +174,7 @@ public Iterator call(InternalRow args) { if (maxConcurrentDeletes != null) { if (table.io() instanceof SupportsBulkOperations) { LOG.warn( - "max_concurrent_deletes only works with FileIOs that do not support bulk deletes. This" + "max_concurrent_deletes only works with FileIOs that do not support bulk deletes. This " + "table is currently using {} which supports bulk deletes so the parameter will be ignored. " + "See that IO's documentation to learn how to adjust parallelism for that particular " + "IO's bulk delete.", @@ -193,6 +198,10 @@ public Iterator call(InternalRow args) { action.usePrefixListing(prefixListing); + if (streamResults) { + action.option("stream-results", "true"); + } + DeleteOrphanFiles.Result result = action.execute(); return asScanIterator(OUTPUT_TYPE, toOutputRows(result)); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index bda9c2eaa078..40505b856737 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.actions; +import static org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.MAX_ORPHAN_FILE_SAMPLE_SIZE; +import static org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.STREAM_RESULTS; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -202,8 +204,23 @@ public void testDryRun() throws IOException { .deleteOrphanFiles(table) .usePrefixListing(usePrefixListing) .olderThan(System.currentTimeMillis()) + .option(STREAM_RESULTS, "true") + .deleteWith(s -> {}) .execute(); assertThat(result3.orphanFileLocations()) + .as("Streaming dry run should find 1 file") + .isEqualTo(invalidFiles); + assertThat(fs.exists(new Path(invalidFiles.get(0)))) + .as("Invalid file should be present after streaming dry run") + .isTrue(); + + DeleteOrphanFiles.Result result4 = + actions + .deleteOrphanFiles(table) + .usePrefixListing(usePrefixListing) + .olderThan(System.currentTimeMillis()) + .execute(); + assertThat(result4.orphanFileLocations()) .as("Action should delete 1 file") .isEqualTo(invalidFiles); assertThat(fs.exists(new Path(invalidFiles.get(0)))) @@ -1203,9 +1220,89 @@ private void executeTest( Dataset validFileDS = spark.createDataset(validFiles, Encoders.STRING()); Dataset actualFileDS = spark.createDataset(actualFiles, Encoders.STRING()); - List orphanFiles = + Dataset orphanFileDS = DeleteOrphanFilesSparkAction.findOrphanFiles( - spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode); + toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode); + + List orphanFiles = orphanFileDS.collectAsList(); + orphanFileDS.unpersist(); + assertThat(orphanFiles).isEqualTo(expectedOrphanFiles); } + + @TestTemplate + public void testStreamResultsDeletion() throws IOException { + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + List validFiles = + spark + .read() + .format("iceberg") + .load(tableLocation + "#files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + assertThat(validFiles).as("Should be 1 valid file").hasSize(1); + + for (int i = 0; i < 10; i++) { + df.write().mode("append").parquet(tableLocation + "/data"); + } + + Path dataPath = new Path(tableLocation + "/data"); + FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); + List allFiles = + Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get())) + .filter(FileStatus::isFile) + .map(file -> file.getPath().toString()) + .collect(Collectors.toList()); + assertThat(allFiles).as("Should be 11 files").hasSize(11); + + List invalidFiles = Lists.newArrayList(allFiles); + invalidFiles.removeAll(validFiles); + assertThat(invalidFiles).as("Should be 10 invalid files").hasSize(10); + + waitUntilAfter(System.currentTimeMillis()); + + DeleteOrphanFiles.Result nonStreamingResult = + SparkActions.get() + .deleteOrphanFiles(table) + .usePrefixListing(usePrefixListing) + .olderThan(System.currentTimeMillis()) + .deleteWith(s -> {}) + .execute(); + + assertThat(nonStreamingResult.orphanFileLocations()) + .as("Non-streaming dry-run should return all 10 orphan files") + .hasSize(10) + .containsExactlyInAnyOrderElementsOf(invalidFiles); + + DeleteOrphanFiles.Result streamingResult = + SparkActions.get() + .deleteOrphanFiles(table) + .usePrefixListing(usePrefixListing) + .olderThan(System.currentTimeMillis()) + .option(STREAM_RESULTS, "true") + .option(MAX_ORPHAN_FILE_SAMPLE_SIZE, "5") + .execute(); + + assertThat(streamingResult.orphanFileLocations()) + .as("Streaming with sample size 5 should return only 5 orphan files") + .hasSize(5); + + for (String invalidFile : invalidFiles) { + assertThat(fs.exists(new Path(invalidFile))).as("Orphan file should be deleted").isFalse(); + } + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = + resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList(); + assertThat(actualRecords).isEqualTo(records); + } }