Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26342 Support custom paths of independent configuration and pool for hfile cleaner #4403

Merged
merged 1 commit into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.master.cleaner.HFileCleaner.CUSTOM_POOL_SIZE;
import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;

import com.google.errorprone.annotations.RestrictedApi;
Expand All @@ -37,6 +38,7 @@
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -78,6 +80,7 @@
import org.apache.hadoop.hbase.PleaseRestartMasterException;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ServerTask;
Expand Down Expand Up @@ -378,12 +381,18 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste

private HbckChore hbckChore;
CatalogJanitor catalogJanitorChore;
// Threadpool for scanning the archive directory, used by the HFileCleaner
private DirScanPool hfileCleanerPool;
// Threadpool for scanning the Old logs directory, used by the LogCleaner
private DirScanPool logCleanerPool;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
// HFile cleaners for the custom hfile archive paths and the default archive path
// The archive path cleaner is the first element
private List<HFileCleaner> hfileCleaners = new ArrayList<>();
// The hfile cleaner paths, including custom paths and the default archive path
private List<Path> hfileCleanerPaths = new ArrayList<>();
// The shared hfile cleaner pool for the custom archive paths
private DirScanPool sharedHFileCleanerPool;
// The exclusive hfile cleaner pool for scanning the archive directory
private DirScanPool exclusiveHFileCleanerPool;
private ReplicationBarrierCleaner replicationBarrierCleaner;
private MobFileCleanerChore mobFileCleanerChore;
private MobFileCompactionChore mobFileCompactionChore;
Expand Down Expand Up @@ -1158,11 +1167,18 @@ private void finishActiveMasterInitialization(MonitoredTask status)
(EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f));
this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime();
configurationManager.registerObserver(this.balancer);
configurationManager.registerObserver(this.hfileCleanerPool);
configurationManager.registerObserver(this.logCleanerPool);
configurationManager.registerObserver(this.hfileCleaner);
configurationManager.registerObserver(this.logCleaner);
configurationManager.registerObserver(this.regionsRecoveryConfigManager);
configurationManager.registerObserver(this.exclusiveHFileCleanerPool);
if (this.sharedHFileCleanerPool != null) {
configurationManager.registerObserver(this.sharedHFileCleanerPool);
}
if (this.hfileCleaners != null) {
for (HFileCleaner cleaner : hfileCleaners) {
configurationManager.registerObserver(cleaner);
}
}
// Set master as 'initialized'.
setInitialized(true);

Expand Down Expand Up @@ -1439,8 +1455,8 @@ public boolean isCatalogJanitorEnabled() {
boolean isCleanerChoreEnabled() {
boolean hfileCleanerFlag = true, logCleanerFlag = true;

if (hfileCleaner != null) {
hfileCleanerFlag = hfileCleaner.getEnabled();
if (getHFileCleaner() != null) {
hfileCleanerFlag = getHFileCleaner().getEnabled();
}

if (logCleaner != null) {
Expand Down Expand Up @@ -1539,13 +1555,47 @@ executorService.new ExecutorConfig().setExecutorType(ExecutorType.MASTER_MERGE_O
getMasterWalManager().getOldLogDir(), logCleanerPool, params);
getChoreService().scheduleChore(logCleaner);

// start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
// Create archive cleaner thread pool
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
getMasterFileSystem().getFileSystem(), archiveDir, hfileCleanerPool, params);
getChoreService().scheduleChore(hfileCleaner);

// Create custom archive hfile cleaners
String[] paths = conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS);
// todo: handle the overlap issues for the custom paths

if (paths != null && paths.length > 0) {
if (conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS) == null) {
Set<String> cleanerClasses = new HashSet<>();
String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
if (cleaners != null) {
Collections.addAll(cleanerClasses, cleaners);
}
conf.setStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS,
cleanerClasses.toArray(new String[cleanerClasses.size()]));
LOG.info("Archive custom cleaner paths: {}, plugins: {}", Arrays.asList(paths),
cleanerClasses);
}
// share the hfile cleaner pool in custom paths
sharedHFileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf.get(CUSTOM_POOL_SIZE, "6"));
for (int i = 0; i < paths.length; i++) {
Path path = new Path(paths[i].trim());
HFileCleaner cleaner =
new HFileCleaner("ArchiveCustomHFileCleaner-" + path.getName(), cleanerInterval, this,
conf, getMasterFileSystem().getFileSystem(), new Path(archiveDir, path),
HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS, sharedHFileCleanerPool, params, null);
hfileCleaners.add(cleaner);
hfileCleanerPaths.add(path);
}
}

// Create the whole archive dir cleaner thread pool
exclusiveHFileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
hfileCleaners.add(0,
new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(),
archiveDir, exclusiveHFileCleanerPool, params, hfileCleanerPaths));
hfileCleanerPaths.add(0, archiveDir);
// Schedule all the hfile cleaners
for (HFileCleaner hFileCleaner : hfileCleaners) {
getChoreService().scheduleChore(hFileCleaner);
}

// Regions Reopen based on very high storeFileRefCount is considered enabled
// only if hbase.regions.recovery.store.file.ref.count has value > 0
Expand Down Expand Up @@ -1593,14 +1643,18 @@ protected void stopServiceThreads() {
}
stopChoreService();
stopExecutorService();
if (hfileCleanerPool != null) {
hfileCleanerPool.shutdownNow();
hfileCleanerPool = null;
if (exclusiveHFileCleanerPool != null) {
exclusiveHFileCleanerPool.shutdownNow();
exclusiveHFileCleanerPool = null;
}
if (logCleanerPool != null) {
logCleanerPool.shutdownNow();
logCleanerPool = null;
}
if (sharedHFileCleanerPool != null) {
sharedHFileCleanerPool.shutdownNow();
sharedHFileCleanerPool = null;
}
if (maintenanceRegionServer != null) {
maintenanceRegionServer.getRegionServer().stop(HBASE_MASTER_CLEANER_INTERVAL);
}
Expand Down Expand Up @@ -1735,7 +1789,12 @@ protected void stopChores() {
shutdownChore(clusterStatusPublisherChore);
shutdownChore(snapshotQuotaChore);
shutdownChore(logCleaner);
shutdownChore(hfileCleaner);
if (hfileCleaners != null) {
for (ScheduledChore chore : hfileCleaners) {
chore.shutdown();
}
hfileCleaners = null;
}
shutdownChore(replicationBarrierCleaner);
shutdownChore(snapshotCleanerChore);
shutdownChore(hbckChore);
Expand Down Expand Up @@ -3210,7 +3269,11 @@ public static void main(String[] args) {
}

public HFileCleaner getHFileCleaner() {
return this.hfileCleaner;
return this.hfileCleaners.get(0);
}

public List<HFileCleaner> getHFileCleaners() {
return this.hfileCleaners;
}

public LogCleaner getLogCleaner() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.janitor.MetaFixer;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
Expand Down Expand Up @@ -874,7 +875,9 @@ public SetCleanerChoreRunningResponse setCleanerChoreRunning(RpcController c,
boolean prevValue =
server.getLogCleaner().getEnabled() && server.getHFileCleaner().getEnabled();
server.getLogCleaner().setEnabled(req.getOn());
server.getHFileCleaner().setEnabled(req.getOn());
for (HFileCleaner hFileCleaner : server.getHFileCleaners()) {
hFileCleaner.setEnabled(req.getOn());
}
return SetCleanerChoreRunningResponse.newBuilder().setPrevValue(prevValue).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.cleaner;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -79,10 +80,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
protected final Map<String, Object> params;
private final AtomicBoolean enabled = new AtomicBoolean(true);
protected List<T> cleanersChain;
protected List<String> excludeDirs;

public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null);
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null, null);
}

/**
Expand All @@ -97,7 +99,8 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi
* @param params members could be used in cleaner
*/
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) {
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params,
List<Path> excludePaths) {
super(name, s, sleepPeriod);

Preconditions.checkNotNull(pool, "Chore's pool can not be null");
Expand All @@ -106,6 +109,19 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi
this.oldFileDir = oldFileDir;
this.conf = conf;
this.params = params;
if (excludePaths != null && !excludePaths.isEmpty()) {
excludeDirs = new ArrayList<>(excludePaths.size());
for (Path path : excludePaths) {
StringBuilder dirPart = new StringBuilder(path.toString());
if (!path.toString().endsWith("/")) {
dirPart.append("/");
}
excludeDirs.add(dirPart.toString());
}
}
if (excludeDirs != null) {
LOG.info("Cleaner {} excludes sub dirs: {}", name, excludeDirs);
}
initCleanerChain(confKey);
}

Expand Down Expand Up @@ -419,9 +435,11 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean
sortByConsumedSpace(subDirs);
// Submit the request of sub-directory deletion.
subDirs.forEach(subDir -> {
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
futures.add(subFuture);
if (!shouldExclude(subDir)) {
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
futures.add(subFuture);
}
});
}

Expand Down Expand Up @@ -451,11 +469,34 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean
}
});
} catch (Exception e) {
LOG.debug("Failed to traverse and delete the path: {}", dir, e);
if (e instanceof FileNotFoundException) {
LOG.debug("Dir dose not exist, {}", dir);
} else {
LOG.error("Failed to traverse and delete the path: {}", dir, e);
}
result.completeExceptionally(e);
}
}

/**
* Check if a path should not perform clear
*/
private boolean shouldExclude(FileStatus f) {
if (!f.isDirectory()) {
return false;
}
if (excludeDirs != null && !excludeDirs.isEmpty()) {
for (String dirPart : excludeDirs) {
// since we make excludeDirs end with '/',
// if a path contains() the dirPart, the path should be excluded
if (f.getPath().toString().contains(dirPart)) {
return true;
}
}
}
return false;
}

/**
* Perform a delete on a specified type.
* @param deletion a delete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ private Type(String cleanerPoolSizeConfigName, String cleanerPoolSizeConfigDefau
}

private DirScanPool(Configuration conf, Type dirScanPoolType) {
this(dirScanPoolType, conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault));
}

private DirScanPool(Type dirScanPoolType, String poolSize) {
this.dirScanPoolType = dirScanPoolType;
this.name = dirScanPoolType.name().toLowerCase();
String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault);
size = CleanerChore.calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
Expand Down Expand Up @@ -143,6 +146,10 @@ public static DirScanPool getHFileCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.HFILE_CLEANER);
}

public static DirScanPool getHFileCleanerScanPool(String poolSize) {
return new DirScanPool(Type.HFILE_CLEANER, poolSize);
}

public static DirScanPool getLogCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.LOG_CLEANER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
"hbase.regionserver.hfilecleaner.thread.check.interval.msec";
static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L;

/**
* The custom paths for hfile cleaner, subdirectories of archive, e.g.
* data/default/testTable1,data/default/testTable2
*/
public static final String HFILE_CLEANER_CUSTOM_PATHS = "hbase.master.hfile.cleaner.custom.paths";

/** Configure hfile cleaner classes for the custom paths */
public static final String HFILE_CLEANER_CUSTOM_PATHS_PLUGINS =
"hbase.master.hfilecleaner.custom.paths.plugins";
public static final String CUSTOM_POOL_SIZE = "hbase.cleaner.custom.hfiles.pool.size";

private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);

StealJobQueue<HFileDeleteTask> largeFileQueue;
Expand Down Expand Up @@ -117,8 +128,13 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, DirScanPool pool, Map<String, Object> params) {
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
params);
params, null);
}

public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, DirScanPool pool, Map<String, Object> params, List<Path> excludePaths) {
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
params, excludePaths);
}

/**
Expand All @@ -134,8 +150,9 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
* @param params params could be used in subclass of BaseHFileCleanerDelegate
*/
public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, String confKey, DirScanPool pool, Map<String, Object> params) {
super(name, period, stopper, conf, fs, directory, confKey, pool, params);
Path directory, String confKey, DirScanPool pool, Map<String, Object> params,
List<Path> excludePaths) {
super(name, period, stopper, conf, fs, directory, confKey, pool, params, excludePaths);
throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
largeQueueInitSize =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
pool, params);
pool, params, null);
this.pendingDelete = new LinkedBlockingQueue<>();
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
this.oldWALsCleaner = createOldWalsCleaner(size);
Expand Down
Loading