Skip to content

Commit

Permalink
add a configuration property for this function.
Browse files Browse the repository at this point in the history
  • Loading branch information
hfutatzhanghb committed Jun 15, 2023
1 parent 1952264 commit 48eb5d9
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ public class CommonConfigurationKeysPublic {
public static final String FS_TRASH_INTERVAL_KEY = "fs.trash.interval";
/** Default value for FS_TRASH_INTERVAL_KEY */
public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
* core-default.xml</a>
*/
public static final String FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY = "fs.trash.clean.trashroot.enable";
/** Default value for FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY */
public static final boolean FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT = false;
/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;

Expand Down Expand Up @@ -70,6 +72,8 @@ public class TrashPolicyDefault extends TrashPolicy {

private long emptierInterval;

private boolean cleanNonCheckpointUnderTrashRoot;

public TrashPolicyDefault() { }

private TrashPolicyDefault(FileSystem fs, Configuration conf)
Expand All @@ -90,6 +94,8 @@ public void initialize(Configuration conf, FileSystem fs, Path home) {
this.emptierInterval = (long)(conf.getFloat(
FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT)
* MSECS_PER_MINUTE);
this.cleanNonCheckpointUnderTrashRoot = conf.getBoolean(
FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY, FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT);
}

@Override
Expand All @@ -101,6 +107,8 @@ public void initialize(Configuration conf, FileSystem fs) {
this.emptierInterval = (long)(conf.getFloat(
FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT)
* MSECS_PER_MINUTE);
this.cleanNonCheckpointUnderTrashRoot = conf.getBoolean(
FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY, FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT);
if (deletionInterval < 0) {
LOG.warn("Invalid value {} for deletion interval,"
+ " deletion interaval can not be negative."
Expand Down Expand Up @@ -242,17 +250,20 @@ public Path getCurrentTrashDir(Path path) throws IOException {

@Override
public Runnable getEmptier() throws IOException {
return new Emptier(getConf(), emptierInterval);
return new Emptier(getConf(), emptierInterval, cleanNonCheckpointUnderTrashRoot);
}

protected class Emptier implements Runnable {

private Configuration conf;
private long emptierInterval;
private boolean cleanNonCheckpointUnderTrashRoot;

Emptier(Configuration conf, long emptierInterval) throws IOException {
Emptier(Configuration conf, long emptierInterval,
boolean cleanNonCheckpointUnderTrashRoot) throws IOException {
this.conf = conf;
this.emptierInterval = emptierInterval;
this.cleanNonCheckpointUnderTrashRoot = cleanNonCheckpointUnderTrashRoot;
if (emptierInterval > deletionInterval || emptierInterval <= 0) {
LOG.info("The configured checkpoint interval is " +
(emptierInterval / MSECS_PER_MINUTE) + " minutes." +
Expand Down Expand Up @@ -374,9 +385,14 @@ private void deleteCheckpoint(Path trashRoot, boolean deleteImmediately)
try {
time = getTimeFromCheckpoint(name);
} catch (ParseException e) {
this.moveToTrash(path, true);
LOG.warn("Unexpected item in trash: " + dir + ". Force moving to trash.");
continue;
if (cleanNonCheckpointUnderTrashRoot) {
this.moveToTrash(path, true);
LOG.warn("Unexpected item in trash: " + dir + ". Force moving to trash.");
continue;
} else {
LOG.warn("Unexpected item in trash: " + dir + ". Ignoring.");
continue;
}
}

if (((now - deletionInterval) > time) || deleteImmediately) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,14 @@
</description>
</property>

<property>
<name>fs.trash.clean.trashroot.enable</name>
<value>false</value>
<description>Whether clean some directories or files
at home of Trash which are not under checkpoint directory or not.
</description>
</property>

<property>
<name>fs.protected.directories</name>
<value></value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ public void testTrashEmptierWithNonCheckpointDir() throws Exception {
conf.set(FS_TRASH_INTERVAL_KEY, "0.2"); // 12 seconds
conf.setClass("fs.file.impl", TestLFS.class, FileSystem.class);
conf.set(FS_TRASH_CHECKPOINT_INTERVAL_KEY, "0.1"); // 6 seconds
conf.setBoolean(FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY, true);
FileSystem fs = FileSystem.getLocal(conf);
conf.set("fs.default.name", fs.getUri().toString());

Expand Down

0 comments on commit 48eb5d9

Please sign in to comment.