Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ private Function<Path, List<Pair<Path, Long>>> pathProcessor(Set<Path> emptyDirs
}

return files.stream()
.filter(status -> !status.isDir())
.filter(this::oldEnough)
.map(status -> Pair.of(status.getPath(), status.getLen()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ private List<Pair<Path, Long>> listPathWithFilter(
Path directory, Predicate<FileStatus> fileStatusFilter, Predicate<Path> fileFilter) {
List<FileStatus> statuses = tryBestListingDirs(directory);
return statuses.stream()
.filter(status -> !status.isDir())
.filter(fileStatusFilter)
.filter(status -> fileFilter.test(status.getPath()))
.map(status -> Pair.of(status.getPath(), status.getLen()))
Expand Down Expand Up @@ -220,11 +221,15 @@ protected void cleanFile(Path path) {
if (!dryRun) {
try {
if (fileIO.isDir(path)) {
fileIO.deleteDirectoryQuietly(path);
LOG.error(
"Refusing to delete directory {} in orphan file cleanup. "
+ "This indicates a bug in candidate collection.",
path);
} else {
fileIO.deleteQuietly(path);
}
} catch (IOException ignored) {
} catch (IOException e) {
LOG.warn("Failed to check whether {} is directory, skip deleting it.", path, e);
}
}
}
Expand Down Expand Up @@ -393,7 +398,7 @@ private List<Path> filterDirs(List<FileStatus> statuses, Predicate<Path> filter)

for (FileStatus status : statuses) {
Path path = status.getPath();
if (filter.test(path)) {
if (status.isDir() && filter.test(path)) {
filtered.add(path);
}
// ignore unknown dirs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,58 @@ void testEmptyPartitionDirectories() throws Exception {
.isTrue();
}

@Test
void testDirectoriesNotTreatedAsOrphanCandidates() throws Exception {
commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1")));

Path partitionPath = new Path(tablePath, "part1=0/part2=a");
Path bucketPath =
listSubDirs(partitionPath, p -> p.getName().startsWith(BUCKET_PATH_PREFIX)).get(0);
assertThat(fileIO.listStatus(bucketPath)).isNotEmpty();

Path subdirInBucket = new Path(bucketPath, "orphan-subdir");
fileIO.mkdirs(subdirInBucket);
fileIO.tryToWriteAtomic(new Path(subdirInBucket, "stale-file.tmp"), "data");

String bucketName = bucketPath.getName();
long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
Files.setLastModifiedTime(
tempDir.resolve("part1=0/part2=a/" + bucketName + "/orphan-subdir"),
FileTime.fromMillis(oldTime));

LocalOrphanFilesClean orphanFilesClean =
new LocalOrphanFilesClean(table, System.currentTimeMillis());
CleanOrphanFilesResult result = orphanFilesClean.clean();

assertThat(result.getDeletedFilesPath())
.noneMatch(p -> p.toString().contains("orphan-subdir"));
assertThat(fileIO.exists(bucketPath)).isTrue();
assertThat(fileIO.listStatus(bucketPath).length).isGreaterThanOrEqualTo(1);
}

@Test
void testDirectoryInSnapshotDirNotTreatedAsCandidate() throws Exception {
commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1")));

Path snapshotDir = new Path(tablePath, "snapshot");
assertThat(fileIO.exists(snapshotDir)).isTrue();

Path unknownDir = new Path(snapshotDir, "UNKNOWN-stale-dir");
fileIO.mkdirs(unknownDir);

long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
Files.setLastModifiedTime(
tempDir.resolve("snapshot/UNKNOWN-stale-dir"), FileTime.fromMillis(oldTime));

LocalOrphanFilesClean orphanFilesClean =
new LocalOrphanFilesClean(table, System.currentTimeMillis());
CleanOrphanFilesResult result = orphanFilesClean.clean();

assertThat(result.getDeletedFilesPath())
.noneMatch(p -> p.toString().contains("UNKNOWN-stale-dir"));
assertThat(fileIO.exists(unknownDir)).isTrue();
}

private void writeData(
SnapshotManager snapshotManager,
List<List<TestPojo>> committedData,
Expand Down Expand Up @@ -824,11 +876,7 @@ private void addNonUsedFiles(Path dir, int fileNum, List<String> fileNamePrefix)
String fileName =
fileNamePrefix.get(RANDOM.nextInt(fileNamePrefix.size())) + UUID.randomUUID();
Path file = new Path(dir, fileName);
if (RANDOM.nextBoolean()) {
fileIO.tryToWriteAtomic(file, "");
} else {
fileIO.mkdirs(file);
}
fileIO.tryToWriteAtomic(file, "");
manuallyAddedFiles.add(file);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public void processElement(
Path dirPath = new Path(dir);
List<FileStatus> files = tryBestListingDirs(dirPath);
for (FileStatus file : files) {
if (oldEnough(file)) {
if (!file.isDir() && oldEnough(file)) {
out.collect(
Tuple2.of(
file.getPath().toString(),
Expand Down Expand Up @@ -324,9 +324,12 @@ public void processElement(StreamRecord<Path> element) {
@Override
public void endInput() throws IOException {
// delete empty dir
while (!emptyDirs.isEmpty()) {
while (!dryRun && !emptyDirs.isEmpty()) {
Set<Path> newEmptyDir = new HashSet<>();
for (Path emptyDir : emptyDirs) {
if (table.location().equals(emptyDir)) {
continue;
}
try {
if (fileIO.delete(emptyDir, false)) {
LOG.info("Clean empty dir: {}", emptyDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,14 @@ case class SparkOrphanFilesClean(
.parallelize(fileDirs, maxFileDirsParallelism)
.flatMap {
dir =>
tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map {
file =>
val path = file.getPath
(path.getName, path.toString, file.getLen, path.getParent.toString)
}
tryBestListingDirs(new Path(dir)).asScala
.filter(file => !file.isDir())
.filter(oldEnough)
.map {
file =>
val path = file.getPath
(path.getName, path.toString, file.getLen, path.getParent.toString)
}
}
.toDF("name", "path", "len", "dataDir")
.repartition(parallelism)
Expand Down
Loading