From e3ce874b7139ccf95b66674d57f8dd57d07ec2bb Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 21 May 2026 11:59:00 +0800 Subject: [PATCH 01/11] [core] Fix potential data loss in orphan clean when listing returns transient empty result When listFileDirs encounters a transient empty response (network jitter, throttling) for a partition directory, the partition path is mistakenly passed to pathProcessor as if it were a bucket-level path. If the second listing succeeds, bucket sub-directories are collected as orphan file candidates. Since bucket directory names never appear in snapshot manifests, they pass the orphan diff and cleanFile recursively deletes the entire bucket directory including valid data files. Fix: - Add isDataStructureDirectory() filter in candidate collection (Local/Flink/Spark) to skip bucket-* and partition=value directories. Other directories (e.g. UNKNOWN-* temp dirs) remain eligible for orphan cleanup to preserve existing behavior. - Add defensive guard in cleanFile to refuse deletion of structural data directories even if they somehow reach the deletion path. - Fix Flink empty-dir cleanup missing dryRun guard (Local and Spark already had it). Co-Authored-By: Claude Opus 4.6 --- .../operation/LocalOrphanFilesClean.java | 1 + .../paimon/operation/OrphanFilesClean.java | 21 +++++++++++++++++++ .../flink/orphan/FlinkOrphanFilesClean.java | 5 +++-- .../procedure/SparkOrphanFilesClean.scala | 4 +++- 4 files changed, 28 insertions(+), 3 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index f8a2e080c329..51690aa4f246 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -258,6 +258,7 @@ private Function>> pathProcessor(Set emptyDirs } return files.stream() + .filter(status -> !isDataStructureDirectory(status)) .filter(this::oldEnough) .map(status -> Pair.of(status.getPath(), status.getLen())) .collect(Collectors.toList()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 5b1b045a7d00..154746c0d013 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -220,6 +220,14 @@ protected void cleanFile(Path path) { if (!dryRun) { try { if (fileIO.isDir(path)) { + String name = path.getName(); + if (name.startsWith(BUCKET_PATH_PREFIX) || name.contains("=")) { + LOG.warn( + "Skipping deletion of data structure directory that was mistakenly " + + "identified as orphan: {}", + path); + return; + } fileIO.deleteDirectoryQuietly(path); } else { fileIO.deleteQuietly(path); @@ -455,6 +463,19 @@ protected boolean oldEnough(FileStatus status) { return status.getModificationTime() < olderThanMillis; } + /** + * Returns true if the given status represents a structural data directory (bucket or partition) + * that should never be treated as an orphan file candidate. Other directories (e.g. unknown + * temp dirs) are still eligible for orphan cleanup. + */ + protected static boolean isDataStructureDirectory(FileStatus status) { + if (!status.isDir()) { + return false; + } + String name = status.getPath().getName(); + return name.startsWith(BUCKET_PATH_PREFIX) || name.contains("="); + } + public static long olderThanMillis(@Nullable String olderThan) { if (isNullOrWhitespaceOnly(olderThan)) { return System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java index d3376cb80cf5..05c642f374c0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -282,7 +282,8 @@ public void processElement( Path dirPath = new Path(dir); List files = tryBestListingDirs(dirPath); for (FileStatus file : files) { - if (oldEnough(file)) { + if (!isDataStructureDirectory(file) + && oldEnough(file)) { out.collect( Tuple2.of( file.getPath().toString(), @@ -324,7 +325,7 @@ public void processElement(StreamRecord element) { @Override public void endInput() throws IOException { // delete empty dir - while (!emptyDirs.isEmpty()) { + while (!dryRun && !emptyDirs.isEmpty()) { Set newEmptyDir = new HashSet<>(); for (Path emptyDir : emptyDirs) { try { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala index 11f1364c1864..fde3af619272 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala @@ -125,7 +125,9 @@ case class SparkOrphanFilesClean( .parallelize(fileDirs, maxFileDirsParallelism) .flatMap { dir => - tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map { + tryBestListingDirs(new Path(dir)).asScala + .filter(file => !isDataStructureDirectory(file)) + .filter(oldEnough).map { file => val path = file.getPath (path.getName, path.toString, file.getLen, path.getParent.toString) From af15188951b8750778a19aadc74beae3e5b8ead2 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 21 May 2026 12:03:55 +0800 Subject: [PATCH 02/11] Simplify fix: use isDir() instead of isDataStructureDirectory() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Orphan candidate should only be files — directories have no Paimon metadata reference semantics. Using plain isDir() is cleaner and more robust than pattern-matching on bucket-*/partition=value names. Also remove the cleanFile defensive guard and isDataStructureDirectory helper since directories can no longer reach the candidate deletion path. Co-Authored-By: Claude Opus 4.6 --- .../operation/LocalOrphanFilesClean.java | 2 +- .../paimon/operation/OrphanFilesClean.java | 21 ------------------- .../flink/orphan/FlinkOrphanFilesClean.java | 3 +-- .../procedure/SparkOrphanFilesClean.scala | 2 +- 4 files changed, 3 insertions(+), 25 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 51690aa4f246..fb9f9f7589e1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -258,7 +258,7 @@ private Function>> pathProcessor(Set emptyDirs } return files.stream() - .filter(status -> !isDataStructureDirectory(status)) + .filter(status -> !status.isDir()) .filter(this::oldEnough) .map(status -> Pair.of(status.getPath(), status.getLen())) .collect(Collectors.toList()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 154746c0d013..5b1b045a7d00 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -220,14 +220,6 @@ protected void cleanFile(Path path) { if (!dryRun) { try { if (fileIO.isDir(path)) { - String name = path.getName(); - if (name.startsWith(BUCKET_PATH_PREFIX) || name.contains("=")) { - LOG.warn( - "Skipping deletion of data structure directory that was mistakenly " - + "identified as orphan: {}", - path); - return; - } fileIO.deleteDirectoryQuietly(path); } else { fileIO.deleteQuietly(path); @@ -463,19 +455,6 @@ protected boolean oldEnough(FileStatus status) { return status.getModificationTime() < olderThanMillis; } - /** - * Returns true if the given status represents a structural data directory (bucket or partition) - * that should never be treated as an orphan file candidate. Other directories (e.g. unknown - * temp dirs) are still eligible for orphan cleanup. - */ - protected static boolean isDataStructureDirectory(FileStatus status) { - if (!status.isDir()) { - return false; - } - String name = status.getPath().getName(); - return name.startsWith(BUCKET_PATH_PREFIX) || name.contains("="); - } - public static long olderThanMillis(@Nullable String olderThan) { if (isNullOrWhitespaceOnly(olderThan)) { return System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java index 05c642f374c0..3e832c7e90dc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -282,8 +282,7 @@ public void processElement( Path dirPath = new Path(dir); List files = tryBestListingDirs(dirPath); for (FileStatus file : files) { - if (!isDataStructureDirectory(file) - && oldEnough(file)) { + if (!file.isDir() && oldEnough(file)) { out.collect( Tuple2.of( file.getPath().toString(), diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala index fde3af619272..8cf53ff0d7b6 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala @@ -126,7 +126,7 @@ case class SparkOrphanFilesClean( .flatMap { dir => tryBestListingDirs(new Path(dir)).asScala - .filter(file => !isDataStructureDirectory(file)) + .filter(file => !file.isDir()) .filter(oldEnough).map { file => val path = file.getPath From 546fb37d7c86517a8093ff1cdd35fa7cf2a8eddf Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 21 May 2026 12:09:18 +0800 Subject: [PATCH 03/11] Fix Spark spotless format violation --- .../spark/procedure/SparkOrphanFilesClean.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala index 8cf53ff0d7b6..98d049d61970 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala @@ -127,11 +127,12 @@ case class SparkOrphanFilesClean( dir => tryBestListingDirs(new Path(dir)).asScala .filter(file => !file.isDir()) - .filter(oldEnough).map { - file => - val path = file.getPath - (path.getName, path.toString, file.getLen, path.getParent.toString) - } + .filter(oldEnough) + .map { + file => + val path = file.getPath + (path.getName, path.toString, file.getLen, path.getParent.toString) + } } .toDF("name", "path", "len", "dataDir") .repartition(parallelism) From 2a10c4e623212b9cbff8e903ef16a412b9a65b2f Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 21 May 2026 12:13:37 +0800 Subject: [PATCH 04/11] Remove deleteDirectoryQuietly from cleanFile, refuse directories cleanFile semantics: delete orphan FILE candidates only. A directory reaching this method means a bug in candidate collection. Log ERROR and refuse instead of recursively deleting. Empty directory cleanup is handled separately via tryDeleteEmptyDirectory (non-recursive delete). --- .../apache/paimon/operation/OrphanFilesClean.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 5b1b045a7d00..66aaf7724edc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -220,11 +220,15 @@ protected void cleanFile(Path path) { if (!dryRun) { try { if (fileIO.isDir(path)) { - fileIO.deleteDirectoryQuietly(path); - } else { - fileIO.deleteQuietly(path); + LOG.error( + "Refusing to delete directory {} in orphan file cleanup. " + + "This indicates a bug in candidate collection.", + path); + return; } - } catch (IOException ignored) { + fileIO.deleteQuietly(path); + } catch (IOException e) { + LOG.warn("Failed to check whether {} is directory, skip deleting it.", path, e); } } } From a0a0bd1275651152ed952c6d55321a6b0a0f6de4 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 21 May 2026 12:15:56 +0800 Subject: [PATCH 05/11] Restore if-else structure in cleanFile for readability --- .../java/org/apache/paimon/operation/OrphanFilesClean.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 66aaf7724edc..89092cfd08e1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -224,9 +224,9 @@ protected void cleanFile(Path path) { "Refusing to delete directory {} in orphan file cleanup. " + "This indicates a bug in candidate collection.", path); - return; + } else { + fileIO.deleteQuietly(path); } - fileIO.deleteQuietly(path); } catch (IOException e) { LOG.warn("Failed to check whether {} is directory, skip deleting it.", path, e); } From 27567683d04b46dd995701868df6f88353a66d26 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 21 May 2026 12:23:47 +0800 Subject: [PATCH 06/11] Add test: directories must not be treated as orphan candidates - addNonUsedFiles now only creates files (not directories) - New test verifies subdirectories are filtered by !isDir() --- .../operation/LocalOrphanFilesCleanTest.java | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index bb20c9904e48..f39984c14904 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -628,6 +628,34 @@ void testEmptyPartitionDirectories() throws Exception { .isTrue(); } + @Test + void testDirectoriesNotTreatedAsOrphanCandidates() throws Exception { + commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1"))); + + Path partitionPath = new Path(tablePath, "part1=0/part2=a"); + Path bucketPath = new Path(partitionPath, "bucket-0"); + assertThat(fileIO.exists(bucketPath)).isTrue(); + assertThat(fileIO.listStatus(bucketPath)).isNotEmpty(); + + Path subdirInBucket = new Path(bucketPath, "orphan-subdir"); + fileIO.mkdirs(subdirInBucket); + fileIO.tryToWriteAtomic(new Path(subdirInBucket, "stale-file.tmp"), "data"); + + long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2); + Files.setLastModifiedTime( + tempDir.resolve("part1=0/part2=a/bucket-0/orphan-subdir"), + FileTime.fromMillis(oldTime)); + + LocalOrphanFilesClean orphanFilesClean = + new LocalOrphanFilesClean(table, System.currentTimeMillis()); + CleanOrphanFilesResult result = orphanFilesClean.clean(); + + assertThat(result.getDeletedFilesPath()) + .noneMatch(p -> p.toString().contains("orphan-subdir")); + assertThat(fileIO.exists(bucketPath)).isTrue(); + assertThat(fileIO.listStatus(bucketPath).length).isGreaterThanOrEqualTo(1); + } + private void writeData( SnapshotManager snapshotManager, List> committedData, @@ -824,11 +852,7 @@ private void addNonUsedFiles(Path dir, int fileNum, List fileNamePrefix) String fileName = fileNamePrefix.get(RANDOM.nextInt(fileNamePrefix.size())) + UUID.randomUUID(); Path file = new Path(dir, fileName); - if (RANDOM.nextBoolean()) { - fileIO.tryToWriteAtomic(file, ""); - } else { - fileIO.mkdirs(file); - } + fileIO.tryToWriteAtomic(file, ""); manuallyAddedFiles.add(file); } } From 1b6f51f2b8e2bfb0b371299af737cd81723abd2c Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 21 May 2026 13:51:07 +0800 Subject: [PATCH 07/11] Filter directories in snapshot/changelog special clean path listPathWithFilter now skips directories so that non-standard dirs under snapshot/ or changelog/ are never counted as orphan candidates. Adds test coverage for this scenario. --- .../paimon/operation/OrphanFilesClean.java | 1 + .../operation/LocalOrphanFilesCleanTest.java | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 89092cfd08e1..2f0da407ed26 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -181,6 +181,7 @@ private List> listPathWithFilter( Path directory, Predicate fileStatusFilter, Predicate fileFilter) { List statuses = tryBestListingDirs(directory); return statuses.stream() + .filter(status -> !status.isDir()) .filter(fileStatusFilter) .filter(status -> fileFilter.test(status.getPath())) .map(status -> Pair.of(status.getPath(), status.getLen())) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index f39984c14904..06e6f54d7bff 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -656,6 +656,29 @@ void testDirectoriesNotTreatedAsOrphanCandidates() throws Exception { assertThat(fileIO.listStatus(bucketPath).length).isGreaterThanOrEqualTo(1); } + @Test + void testDirectoryInSnapshotDirNotTreatedAsCandidate() throws Exception { + commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1"))); + + Path snapshotDir = new Path(tablePath, "snapshot"); + assertThat(fileIO.exists(snapshotDir)).isTrue(); + + Path unknownDir = new Path(snapshotDir, "UNKNOWN-stale-dir"); + fileIO.mkdirs(unknownDir); + + long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2); + Files.setLastModifiedTime( + tempDir.resolve("snapshot/UNKNOWN-stale-dir"), FileTime.fromMillis(oldTime)); + + LocalOrphanFilesClean orphanFilesClean = + new LocalOrphanFilesClean(table, System.currentTimeMillis()); + CleanOrphanFilesResult result = orphanFilesClean.clean(); + + assertThat(result.getDeletedFilesPath()) + .noneMatch(p -> p.toString().contains("UNKNOWN-stale-dir")); + assertThat(fileIO.exists(unknownDir)).isTrue(); + } + private void writeData( SnapshotManager snapshotManager, List> committedData, From a7370c3cc80732f3e253874a93fde42cc06a3193 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 21 May 2026 14:15:23 +0800 Subject: [PATCH 08/11] Fix flaky bucket-0 assumption in test, harden filterDirs - testDirectoriesNotTreatedAsOrphanCandidates now discovers the actual bucket path via listSubDirs instead of hardcoding bucket-0 - filterDirs adds status.isDir() check so only actual directories pass through the partition/bucket traversal filter --- .../org/apache/paimon/operation/OrphanFilesClean.java | 3 +-- .../paimon/operation/LocalOrphanFilesCleanTest.java | 8 +++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 2f0da407ed26..2cb1eedc22b0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -398,10 +398,9 @@ private List filterDirs(List statuses, Predicate filter) for (FileStatus status : statuses) { Path path = status.getPath(); - if (filter.test(path)) { + if (status.isDir() && filter.test(path)) { filtered.add(path); } - // ignore unknown dirs } return filtered; diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index 06e6f54d7bff..13cf900903da 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -633,17 +633,19 @@ void testDirectoriesNotTreatedAsOrphanCandidates() throws Exception { commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1"))); Path partitionPath = new Path(tablePath, "part1=0/part2=a"); - Path bucketPath = new Path(partitionPath, "bucket-0"); - assertThat(fileIO.exists(bucketPath)).isTrue(); + Path bucketPath = + listSubDirs(partitionPath, p -> p.getName().startsWith(BUCKET_PATH_PREFIX)) + .get(0); assertThat(fileIO.listStatus(bucketPath)).isNotEmpty(); Path subdirInBucket = new Path(bucketPath, "orphan-subdir"); fileIO.mkdirs(subdirInBucket); fileIO.tryToWriteAtomic(new Path(subdirInBucket, "stale-file.tmp"), "data"); + String bucketName = bucketPath.getName(); long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2); Files.setLastModifiedTime( - tempDir.resolve("part1=0/part2=a/bucket-0/orphan-subdir"), + tempDir.resolve("part1=0/part2=a/" + bucketName + "/orphan-subdir"), FileTime.fromMillis(oldTime)); LocalOrphanFilesClean orphanFilesClean = From 3bd1e5d8cd8d1ac46edd785c913a583f35c06e2b Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 21 May 2026 14:20:53 +0800 Subject: [PATCH 09/11] Restore comment in filterDirs --- .../main/java/org/apache/paimon/operation/OrphanFilesClean.java | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 2cb1eedc22b0..b146ffbac725 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -401,6 +401,7 @@ private List filterDirs(List statuses, Predicate filter) if (status.isDir() && filter.test(path)) { filtered.add(path); } + // ignore unknown dirs } return filtered; From f84dd8f550b2f94ab82a9aa9752db3dc883f4d59 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 21 May 2026 14:37:13 +0800 Subject: [PATCH 10/11] Fix spotless formatting --- .../org/apache/paimon/operation/LocalOrphanFilesCleanTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index 13cf900903da..b690246101be 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -634,8 +634,7 @@ void testDirectoriesNotTreatedAsOrphanCandidates() throws Exception { Path partitionPath = new Path(tablePath, "part1=0/part2=a"); Path bucketPath = - listSubDirs(partitionPath, p -> p.getName().startsWith(BUCKET_PATH_PREFIX)) - .get(0); + listSubDirs(partitionPath, p -> p.getName().startsWith(BUCKET_PATH_PREFIX)).get(0); assertThat(fileIO.listStatus(bucketPath)).isNotEmpty(); Path subdirInBucket = new Path(bucketPath, "orphan-subdir"); From 742553a5cff7ab9e7ba81683d12838e4b9e3b344 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 21 May 2026 15:09:04 +0800 Subject: [PATCH 11/11] Align Flink empty-dir cleanup with Local: skip table root Prevents the parent-walk loop from deleting table.location() itself, matching the existing guard in LocalOrphanFilesClean. --- .../org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java index 3e832c7e90dc..a30cc3e0dd03 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -327,6 +327,9 @@ public void endInput() throws IOException { while (!dryRun && !emptyDirs.isEmpty()) { Set newEmptyDir = new HashSet<>(); for (Path emptyDir : emptyDirs) { + if (table.location().equals(emptyDir)) { + continue; + } try { if (fileIO.delete(emptyDir, false)) { LOG.info("Clean empty dir: {}", emptyDir);