diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index f8a2e080c329..fb9f9f7589e1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -258,6 +258,7 @@ private Function>> pathProcessor(Set emptyDirs } return files.stream() + .filter(status -> !status.isDir()) .filter(this::oldEnough) .map(status -> Pair.of(status.getPath(), status.getLen())) .collect(Collectors.toList()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 5b1b045a7d00..b146ffbac725 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -181,6 +181,7 @@ private List> listPathWithFilter( Path directory, Predicate fileStatusFilter, Predicate fileFilter) { List statuses = tryBestListingDirs(directory); return statuses.stream() + .filter(status -> !status.isDir()) .filter(fileStatusFilter) .filter(status -> fileFilter.test(status.getPath())) .map(status -> Pair.of(status.getPath(), status.getLen())) @@ -220,11 +221,15 @@ protected void cleanFile(Path path) { if (!dryRun) { try { if (fileIO.isDir(path)) { - fileIO.deleteDirectoryQuietly(path); + LOG.error( + "Refusing to delete directory {} in orphan file cleanup. " + + "This indicates a bug in candidate collection.", + path); } else { fileIO.deleteQuietly(path); } - } catch (IOException ignored) { + } catch (IOException e) { + LOG.warn("Failed to check whether {} is directory, skip deleting it.", path, e); } } } @@ -393,7 +398,7 @@ private List filterDirs(List statuses, Predicate filter) for (FileStatus status : statuses) { Path path = status.getPath(); - if (filter.test(path)) { + if (status.isDir() && filter.test(path)) { filtered.add(path); } // ignore unknown dirs diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index bb20c9904e48..b690246101be 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -628,6 +628,58 @@ void testEmptyPartitionDirectories() throws Exception { .isTrue(); } + @Test + void testDirectoriesNotTreatedAsOrphanCandidates() throws Exception { + commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1"))); + + Path partitionPath = new Path(tablePath, "part1=0/part2=a"); + Path bucketPath = + listSubDirs(partitionPath, p -> p.getName().startsWith(BUCKET_PATH_PREFIX)).get(0); + assertThat(fileIO.listStatus(bucketPath)).isNotEmpty(); + + Path subdirInBucket = new Path(bucketPath, "orphan-subdir"); + fileIO.mkdirs(subdirInBucket); + fileIO.tryToWriteAtomic(new Path(subdirInBucket, "stale-file.tmp"), "data"); + + String bucketName = bucketPath.getName(); + long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2); + Files.setLastModifiedTime( + tempDir.resolve("part1=0/part2=a/" + bucketName + "/orphan-subdir"), + FileTime.fromMillis(oldTime)); + + LocalOrphanFilesClean orphanFilesClean = + new LocalOrphanFilesClean(table, System.currentTimeMillis()); + CleanOrphanFilesResult result = orphanFilesClean.clean(); + + assertThat(result.getDeletedFilesPath()) + .noneMatch(p -> p.toString().contains("orphan-subdir")); + assertThat(fileIO.exists(bucketPath)).isTrue(); + assertThat(fileIO.listStatus(bucketPath).length).isGreaterThanOrEqualTo(1); + } + + @Test + void testDirectoryInSnapshotDirNotTreatedAsCandidate() throws Exception { + commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1"))); + + Path snapshotDir = new Path(tablePath, "snapshot"); + assertThat(fileIO.exists(snapshotDir)).isTrue(); + + Path unknownDir = new Path(snapshotDir, "UNKNOWN-stale-dir"); + fileIO.mkdirs(unknownDir); + + long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2); + Files.setLastModifiedTime( + tempDir.resolve("snapshot/UNKNOWN-stale-dir"), FileTime.fromMillis(oldTime)); + + LocalOrphanFilesClean orphanFilesClean = + new LocalOrphanFilesClean(table, System.currentTimeMillis()); + CleanOrphanFilesResult result = orphanFilesClean.clean(); + + assertThat(result.getDeletedFilesPath()) + .noneMatch(p -> p.toString().contains("UNKNOWN-stale-dir")); + assertThat(fileIO.exists(unknownDir)).isTrue(); + } + private void writeData( SnapshotManager snapshotManager, List> committedData, @@ -824,11 +876,7 @@ private void addNonUsedFiles(Path dir, int fileNum, List fileNamePrefix) String fileName = fileNamePrefix.get(RANDOM.nextInt(fileNamePrefix.size())) + UUID.randomUUID(); Path file = new Path(dir, fileName); - if (RANDOM.nextBoolean()) { - fileIO.tryToWriteAtomic(file, ""); - } else { - fileIO.mkdirs(file); - } + fileIO.tryToWriteAtomic(file, ""); manuallyAddedFiles.add(file); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java index d3376cb80cf5..a30cc3e0dd03 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -282,7 +282,7 @@ public void processElement( Path dirPath = new Path(dir); List files = tryBestListingDirs(dirPath); for (FileStatus file : files) { - if (oldEnough(file)) { + if (!file.isDir() && oldEnough(file)) { out.collect( Tuple2.of( file.getPath().toString(), @@ -324,9 +324,12 @@ public void processElement(StreamRecord element) { @Override public void endInput() throws IOException { // delete empty dir - while (!emptyDirs.isEmpty()) { + while (!dryRun && !emptyDirs.isEmpty()) { Set newEmptyDir = new HashSet<>(); for (Path emptyDir : emptyDirs) { + if (table.location().equals(emptyDir)) { + continue; + } try { if (fileIO.delete(emptyDir, false)) { LOG.info("Clean empty dir: {}", emptyDir); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala index 11f1364c1864..98d049d61970 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala @@ -125,11 +125,14 @@ case class SparkOrphanFilesClean( .parallelize(fileDirs, maxFileDirsParallelism) .flatMap { dir => - tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map { - file => - val path = file.getPath - (path.getName, path.toString, file.getLen, path.getParent.toString) - } + tryBestListingDirs(new Path(dir)).asScala + .filter(file => !file.isDir()) + .filter(oldEnough) + .map { + file => + val path = file.getPath + (path.getName, path.toString, file.getLen, path.getParent.toString) + } } .toDF("name", "path", "len", "dataDir") .repartition(parallelism)