diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java index ab12a3b7c1e3..54eb2a2482fd 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java @@ -35,10 +35,15 @@ public interface DeleteOrphanFiles extends ActionThe location is interpreted as a directory: only files contained within that directory + * subtree are considered. Files that merely share the location's path prefix without being nested + * under it (for example a sibling directory such as {@code .../table-backup} relative to a {@code + * .../table} location, or the location path itself) are out of scope. + * *

If not set, the root table location will be scanned potentially removing both orphan data * and metadata files. * - * @param location the location where to look for orphan files + * @param location the root directory under which to look for orphan files * @return this for method chaining */ DeleteOrphanFiles location(String location); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 92bfc880ad7f..a1e5974c930f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -52,6 +52,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.util.FileSystemWalker; +import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -225,7 +226,10 @@ public DeleteOrphanFilesSparkAction usePrefixListing(boolean newUsePrefixListing private Dataset filteredCompareToFileList() { Dataset files = compareToFileList; if (location != null) { - files = files.filter(files.col(FILE_PATH).startsWith(location)); + // Treat location as a directory by appending a trailing separator so that sibling + // prefixes (e.g. ".../table-backup/" when location is ".../table") are not in scope. + String prefix = LocationUtil.stripTrailingSlash(location) + "/"; + files = files.filter(files.col(FILE_PATH).startsWith(prefix)); } return files .filter(files.col(LAST_MODIFIED).lt(new Timestamp(olderThanTimestamp))) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 0d2a5c0a4daf..7c5d6bac93a7 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -1025,6 +1025,72 @@ public void testCompareToFileList() throws IOException { assertThat(result4.orphanFilesCount()).as("Action should find nothing").isEqualTo(0L); } + @TestTemplate + public void testCompareToFileListExcludesPathsOutsideLocationScope() throws IOException { + assumeThat(usePrefixListing) + .as("Should not test both prefix listing and Hadoop file listing (redundant)") + .isEqualTo(false); + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + // Production storage URIs like "s3://bucket/table" typically have no trailing separator, + // which is the shape that previously let out-of-scope paths fall into scope. + String scopedLocation = tableLocation.substring(0, tableLocation.length() - 1); + // Case 1 - a sibling directory sharing the location's prefix (".../table-orphan-sibling/..."). + String siblingFilePath = scopedLocation + "-orphan-sibling/data.parquet"; + // Case 2 - a path equal to the location itself (".../table"): the new trailing-separator + // prefix excludes it, whereas the previous raw startsWith(location) filter kept it in scope. + String exactLocationFilePath = scopedLocation; + + Path dataPath = new Path(tableLocation + "/data"); + FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); + List validFiles = + Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get())) + .filter(FileStatus::isFile) + .map( + file -> + new FilePathLastModifiedRecord( + file.getPath().toString(), new Timestamp(file.getModificationTime()))) + .collect(Collectors.toList()); + assertThat(validFiles).as("Should be 1 valid file").hasSize(1); + + List fileList = Lists.newArrayList(validFiles); + fileList.add(new FilePathLastModifiedRecord(siblingFilePath, new Timestamp(0L))); + fileList.add(new FilePathLastModifiedRecord(exactLocationFilePath, new Timestamp(0L))); + + waitUntilAfter(System.currentTimeMillis()); + + Dataset compareToFileList = + spark + .createDataFrame(fileList, FilePathLastModifiedRecord.class) + .withColumnRenamed("filePath", "file_path") + .withColumnRenamed("lastModified", "last_modified"); + + SparkActions actions = SparkActions.get(); + DeleteOrphanFiles.Result result = + actions + .deleteOrphanFiles(table) + .location(scopedLocation) + .compareToFileList(compareToFileList) + .olderThan(System.currentTimeMillis()) + .deleteWith(s -> {}) + .execute(); + + assertThat(result.orphanFileLocations()) + .as("Sibling prefix must not fall inside the location scope") + .doesNotContain(siblingFilePath); + assertThat(result.orphanFileLocations()) + .as("A path equal to the location is excluded by the trailing-separator prefix") + .doesNotContain(exactLocationFilePath); + assertThat(result.orphanFileLocations()) + .as("Valid files are known to the table; nothing should be reported as orphan") + .isEmpty(); + } + protected long waitUntilAfter(long timestampMillis) { long current = System.currentTimeMillis(); while (current <= timestampMillis) { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 92bfc880ad7f..a1e5974c930f 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -52,6 +52,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.util.FileSystemWalker; +import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -225,7 +226,10 @@ public DeleteOrphanFilesSparkAction usePrefixListing(boolean newUsePrefixListing private Dataset filteredCompareToFileList() { Dataset files = compareToFileList; if (location != null) { - files = files.filter(files.col(FILE_PATH).startsWith(location)); + // Treat location as a directory by appending a trailing separator so that sibling + // prefixes (e.g. ".../table-backup/" when location is ".../table") are not in scope. + String prefix = LocationUtil.stripTrailingSlash(location) + "/"; + files = files.filter(files.col(FILE_PATH).startsWith(prefix)); } return files .filter(files.col(LAST_MODIFIED).lt(new Timestamp(olderThanTimestamp))) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 0d2a5c0a4daf..7c5d6bac93a7 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -1025,6 +1025,72 @@ public void testCompareToFileList() throws IOException { assertThat(result4.orphanFilesCount()).as("Action should find nothing").isEqualTo(0L); } + @TestTemplate + public void testCompareToFileListExcludesPathsOutsideLocationScope() throws IOException { + assumeThat(usePrefixListing) + .as("Should not test both prefix listing and Hadoop file listing (redundant)") + .isEqualTo(false); + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + // Production storage URIs like "s3://bucket/table" typically have no trailing separator, + // which is the shape that previously let out-of-scope paths fall into scope. + String scopedLocation = tableLocation.substring(0, tableLocation.length() - 1); + // Case 1 - a sibling directory sharing the location's prefix (".../table-orphan-sibling/..."). + String siblingFilePath = scopedLocation + "-orphan-sibling/data.parquet"; + // Case 2 - a path equal to the location itself (".../table"): the new trailing-separator + // prefix excludes it, whereas the previous raw startsWith(location) filter kept it in scope. + String exactLocationFilePath = scopedLocation; + + Path dataPath = new Path(tableLocation + "/data"); + FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); + List validFiles = + Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get())) + .filter(FileStatus::isFile) + .map( + file -> + new FilePathLastModifiedRecord( + file.getPath().toString(), new Timestamp(file.getModificationTime()))) + .collect(Collectors.toList()); + assertThat(validFiles).as("Should be 1 valid file").hasSize(1); + + List fileList = Lists.newArrayList(validFiles); + fileList.add(new FilePathLastModifiedRecord(siblingFilePath, new Timestamp(0L))); + fileList.add(new FilePathLastModifiedRecord(exactLocationFilePath, new Timestamp(0L))); + + waitUntilAfter(System.currentTimeMillis()); + + Dataset compareToFileList = + spark + .createDataFrame(fileList, FilePathLastModifiedRecord.class) + .withColumnRenamed("filePath", "file_path") + .withColumnRenamed("lastModified", "last_modified"); + + SparkActions actions = SparkActions.get(); + DeleteOrphanFiles.Result result = + actions + .deleteOrphanFiles(table) + .location(scopedLocation) + .compareToFileList(compareToFileList) + .olderThan(System.currentTimeMillis()) + .deleteWith(s -> {}) + .execute(); + + assertThat(result.orphanFileLocations()) + .as("Sibling prefix must not fall inside the location scope") + .doesNotContain(siblingFilePath); + assertThat(result.orphanFileLocations()) + .as("A path equal to the location is excluded by the trailing-separator prefix") + .doesNotContain(exactLocationFilePath); + assertThat(result.orphanFileLocations()) + .as("Valid files are known to the table; nothing should be reported as orphan") + .isEmpty(); + } + protected long waitUntilAfter(long timestampMillis) { long current = System.currentTimeMillis(); while (current <= timestampMillis) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 92bfc880ad7f..a1e5974c930f 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -52,6 +52,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.util.FileSystemWalker; +import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -225,7 +226,10 @@ public DeleteOrphanFilesSparkAction usePrefixListing(boolean newUsePrefixListing private Dataset filteredCompareToFileList() { Dataset files = compareToFileList; if (location != null) { - files = files.filter(files.col(FILE_PATH).startsWith(location)); + // Treat location as a directory by appending a trailing separator so that sibling + // prefixes (e.g. ".../table-backup/" when location is ".../table") are not in scope. + String prefix = LocationUtil.stripTrailingSlash(location) + "/"; + files = files.filter(files.col(FILE_PATH).startsWith(prefix)); } return files .filter(files.col(LAST_MODIFIED).lt(new Timestamp(olderThanTimestamp))) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 5a33c710b2f6..1d9dbd126c41 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -1026,6 +1026,72 @@ public void testCompareToFileList() throws IOException { assertThat(result4.orphanFilesCount()).as("Action should find nothing").isEqualTo(0L); } + @TestTemplate + public void testCompareToFileListExcludesPathsOutsideLocationScope() throws IOException { + assumeThat(usePrefixListing) + .as("Should not test both prefix listing and Hadoop file listing (redundant)") + .isEqualTo(false); + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + // Production storage URIs like "s3://bucket/table" typically have no trailing separator, + // which is the shape that previously let out-of-scope paths fall into scope. + String scopedLocation = tableLocation.substring(0, tableLocation.length() - 1); + // Case 1 - a sibling directory sharing the location's prefix (".../table-orphan-sibling/..."). + String siblingFilePath = scopedLocation + "-orphan-sibling/data.parquet"; + // Case 2 - a path equal to the location itself (".../table"): the new trailing-separator + // prefix excludes it, whereas the previous raw startsWith(location) filter kept it in scope. + String exactLocationFilePath = scopedLocation; + + Path dataPath = new Path(tableLocation + "/data"); + FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); + List validFiles = + Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get())) + .filter(FileStatus::isFile) + .map( + file -> + new FilePathLastModifiedRecord( + file.getPath().toString(), new Timestamp(file.getModificationTime()))) + .collect(Collectors.toList()); + assertThat(validFiles).as("Should be 1 valid file").hasSize(1); + + List fileList = Lists.newArrayList(validFiles); + fileList.add(new FilePathLastModifiedRecord(siblingFilePath, new Timestamp(0L))); + fileList.add(new FilePathLastModifiedRecord(exactLocationFilePath, new Timestamp(0L))); + + waitUntilAfter(System.currentTimeMillis()); + + Dataset compareToFileList = + spark + .createDataFrame(fileList, FilePathLastModifiedRecord.class) + .withColumnRenamed("filePath", "file_path") + .withColumnRenamed("lastModified", "last_modified"); + + SparkActions actions = SparkActions.get(); + DeleteOrphanFiles.Result result = + actions + .deleteOrphanFiles(table) + .location(scopedLocation) + .compareToFileList(compareToFileList) + .olderThan(System.currentTimeMillis()) + .deleteWith(s -> {}) + .execute(); + + assertThat(result.orphanFileLocations()) + .as("Sibling prefix must not fall inside the location scope") + .doesNotContain(siblingFilePath); + assertThat(result.orphanFileLocations()) + .as("A path equal to the location is excluded by the trailing-separator prefix") + .doesNotContain(exactLocationFilePath); + assertThat(result.orphanFileLocations()) + .as("Valid files are known to the table; nothing should be reported as orphan") + .isEmpty(); + } + protected long waitUntilAfter(long timestampMillis) { long current = System.currentTimeMillis(); while (current <= timestampMillis) {