Skip to content

Commit

Permalink
HBASE-26342 Support custom paths of independent configuration and poo…
Browse files Browse the repository at this point in the history
…l for hfile cleaner
  • Loading branch information
sunhelly committed May 5, 2022
1 parent 64a6ba3 commit fd381dd
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.master.cleaner.HFileCleaner.CUSTOM_POOL_SIZE;
import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;

import com.google.errorprone.annotations.RestrictedApi;
Expand All @@ -37,6 +38,7 @@
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -78,6 +80,7 @@
import org.apache.hadoop.hbase.PleaseRestartMasterException;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ServerTask;
Expand Down Expand Up @@ -384,6 +387,9 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
private DirScanPool logCleanerPool;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private HFileCleaner[] customHFileCleaners;
private Path[] customHFilePaths;
private DirScanPool customHFileCleanerPool;
private ReplicationBarrierCleaner replicationBarrierCleaner;
private MobFileCleanerChore mobFileCleanerChore;
private MobFileCompactionChore mobFileCompactionChore;
Expand Down Expand Up @@ -1163,6 +1169,14 @@ private void finishActiveMasterInitialization(MonitoredTask status)
configurationManager.registerObserver(this.hfileCleaner);
configurationManager.registerObserver(this.logCleaner);
configurationManager.registerObserver(this.regionsRecoveryConfigManager);
if (this.customHFileCleanerPool != null) {
configurationManager.registerObserver(this.customHFileCleanerPool);
}
if (this.customHFileCleaners != null) {
for (HFileCleaner cleaner : customHFileCleaners) {
configurationManager.registerObserver(cleaner);
}
}
// Set master as 'initialized'.
setInitialized(true);

Expand Down Expand Up @@ -1539,12 +1553,45 @@ executorService.new ExecutorConfig().setExecutorType(ExecutorType.MASTER_MERGE_O
getMasterWalManager().getOldLogDir(), logCleanerPool, params);
getChoreService().scheduleChore(logCleaner);

// start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);

// Create custom archive hfile cleaner thread pool
if (conf.getBoolean(HFileCleaner.HFILE_CLEANER_ENABLE_CUSTOM_PATHS, false)) {
String[] paths = conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS);
if (paths != null && paths.length > 0) {
if (conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS) == null) {
Set<String> cleanerClasses = new HashSet<>();
String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
if (cleaners != null) {
Collections.addAll(cleanerClasses, cleaners);
}
conf.setStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS,
cleanerClasses.toArray(new String[cleanerClasses.size()]));
LOG.info("Archive custom cleaner paths: {}, plugins: {}", Arrays.asList(paths),
cleanerClasses);
}
customHFileCleanerPool =
DirScanPool.getHFileCleanerScanPool(conf.get(CUSTOM_POOL_SIZE, "6"));
customHFilePaths = new Path[paths.length];
customHFileCleaners = new HFileCleaner[paths.length];
for (int i = 0; i < paths.length; i++) {
Path path = new Path(paths[i].trim());
customHFilePaths[i] = path;
HFileCleaner cleaner = new HFileCleaner("ArchiveCustomHFileCleaner-" + path.getName(),
cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(),
new Path(archiveDir, path), HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS,
customHFileCleanerPool, params, null);
customHFileCleaners[i] = cleaner;
getChoreService().scheduleChore(cleaner);
}
}
}

// Create archive cleaner thread pool
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
getMasterFileSystem().getFileSystem(), archiveDir, hfileCleanerPool, params);
this.hfileCleaner =
new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(),
archiveDir, hfileCleanerPool, params, customHFilePaths);
getChoreService().scheduleChore(hfileCleaner);

// Regions Reopen based on very high storeFileRefCount is considered enabled
Expand Down Expand Up @@ -1601,6 +1648,10 @@ protected void stopServiceThreads() {
logCleanerPool.shutdownNow();
logCleanerPool = null;
}
if (customHFileCleanerPool != null) {
customHFileCleanerPool.shutdownNow();
customHFileCleanerPool = null;
}
if (maintenanceRegionServer != null) {
maintenanceRegionServer.getRegionServer().stop(HBASE_MASTER_CLEANER_INTERVAL);
}
Expand Down Expand Up @@ -1736,6 +1787,12 @@ protected void stopChores() {
shutdownChore(snapshotQuotaChore);
shutdownChore(logCleaner);
shutdownChore(hfileCleaner);
if (customHFileCleaners != null) {
for (ScheduledChore chore : customHFileCleaners) {
chore.shutdown();
}
customHFileCleaners = null;
}
shutdownChore(replicationBarrierCleaner);
shutdownChore(snapshotCleanerChore);
shutdownChore(hbckChore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.cleaner;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -79,10 +80,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
protected final Map<String, Object> params;
private final AtomicBoolean enabled = new AtomicBoolean(true);
protected List<T> cleanersChain;
protected String[] excludeDirs;

public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null);
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null, null);
}

/**
Expand All @@ -97,7 +99,8 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi
* @param params members could be used in cleaner
*/
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) {
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params,
Path[] excludePaths) {
super(name, s, sleepPeriod);

Preconditions.checkNotNull(pool, "Chore's pool can not be null");
Expand All @@ -106,6 +109,19 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi
this.oldFileDir = oldFileDir;
this.conf = conf;
this.params = params;
if (excludePaths != null && excludePaths.length > 0) {
excludeDirs = new String[excludePaths.length];
for (int i = 0; i < excludePaths.length; i++) {
StringBuilder dirPart = new StringBuilder(excludePaths[i].toString());
if (!excludePaths[i].toString().endsWith("/")) {
dirPart.append("/");
}
excludeDirs[i] = dirPart.toString();
}
}
if (excludeDirs != null) {
LOG.info("Cleaner {} excludes sub dirs: {}", name, Arrays.asList(excludeDirs));
}
initCleanerChain(confKey);
}

Expand Down Expand Up @@ -419,9 +435,11 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean
sortByConsumedSpace(subDirs);
// Submit the request of sub-directory deletion.
subDirs.forEach(subDir -> {
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
futures.add(subFuture);
if (!shouldExclude(subDir)) {
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
futures.add(subFuture);
}
});
}

Expand Down Expand Up @@ -451,11 +469,34 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean
}
});
} catch (Exception e) {
LOG.debug("Failed to traverse and delete the path: {}", dir, e);
if (e instanceof FileNotFoundException) {
LOG.debug("Dir dose not exist, {}", dir);
} else {
LOG.error("Failed to traverse and delete the path: {}", dir, e);
}
result.completeExceptionally(e);
}
}

/**
* Check if a path should not perform clear
*/
private boolean shouldExclude(FileStatus f) {
if (!f.isDirectory()) {
return false;
}
if (excludeDirs != null && excludeDirs.length > 0) {
for (String dirPart : excludeDirs) {
// since we make excludeDirs end with '/',
// if a path contains() the dirPart, the path should be excluded
if (f.getPath().toString().contains(dirPart)) {
return true;
}
}
}
return false;
}

/**
* Perform a delete on a specified type.
* @param deletion a delete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ private Type(String cleanerPoolSizeConfigName, String cleanerPoolSizeConfigDefau
}

private DirScanPool(Configuration conf, Type dirScanPoolType) {
this(dirScanPoolType, conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault));
}

private DirScanPool(Type dirScanPoolType, String poolSize) {
this.dirScanPoolType = dirScanPoolType;
this.name = dirScanPoolType.name().toLowerCase();
String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault);
size = CleanerChore.calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
Expand Down Expand Up @@ -143,6 +146,10 @@ public static DirScanPool getHFileCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.HFILE_CLEANER);
}

public static DirScanPool getHFileCleanerScanPool(String poolSize) {
return new DirScanPool(Type.HFILE_CLEANER, poolSize);
}

public static DirScanPool getLogCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.LOG_CLEANER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,24 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
"hbase.regionserver.hfilecleaner.thread.check.interval.msec";
static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L;

/**
* Configuration to enable custom hfile paths (under archive) for cleaner, which can use the
* independent pool and configuration.
*/
public static final String HFILE_CLEANER_ENABLE_CUSTOM_PATHS =
"hbase.master.hfile.cleaner.enable.custom.paths";

/**
* The custom paths for hfile cleaner, subdirectories of archive, e.g.
* data/default/testTable1,data/default/testTable2
*/
public static final String HFILE_CLEANER_CUSTOM_PATHS = "hbase.master.hfile.cleaner.custom.paths";

/** Configure hfile cleaner classes for the custom paths */
public static final String HFILE_CLEANER_CUSTOM_PATHS_PLUGINS =
"hbase.master.hfilecleaner.custom.paths.plugins";
public static final String CUSTOM_POOL_SIZE = "hbase.cleaner.custom.hfiles.pool.size";

private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);

StealJobQueue<HFileDeleteTask> largeFileQueue;
Expand Down Expand Up @@ -117,8 +135,13 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, DirScanPool pool, Map<String, Object> params) {
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
params);
params, null);
}

public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, DirScanPool pool, Map<String, Object> params, Path[] excludePaths) {
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
params, excludePaths);
}

/**
Expand All @@ -134,8 +157,9 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
* @param params params could be used in subclass of BaseHFileCleanerDelegate
*/
public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, String confKey, DirScanPool pool, Map<String, Object> params) {
super(name, period, stopper, conf, fs, directory, confKey, pool, params);
Path directory, String confKey, DirScanPool pool, Map<String, Object> params,
Path[] excludePaths) {
super(name, period, stopper, conf, fs, directory, confKey, pool, params, excludePaths);
throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
largeQueueInitSize =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
pool, params);
pool, params, null);
this.pendingDelete = new LinkedBlockingQueue<>();
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
this.oldWALsCleaner = createOldWalsCleaner(size);
Expand Down
Loading

0 comments on commit fd381dd

Please sign in to comment.