Skip to content

Commit

Permalink
HBASE-23680 RegionProcedureStore missing cleaning of hfile archive
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Jan 16, 2020
1 parent 19d3bed commit 936c952
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 33 deletions.
13 changes: 12 additions & 1 deletion hbase-common/src/main/resources/hbase-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,18 @@ possible configurations would overwhelm and obscure the important.
so put the cleaner that prunes the most files in front. To
implement your own BaseHFileCleanerDelegate, just put it in HBase's classpath
and add the fully qualified class name here. Always add the above
default log cleaners in the list as they will be overwritten in
default hfile cleaners in the list as they will be overwritten in
hbase-site.xml.</description>
</property>
<property>
<name>hbase.procedure.store.region.hfilecleaner.plugins</name>
<value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner</value>
<description>A comma-separated list of BaseHFileCleanerDelegate invoked by
the RegionProcedureStore HFileCleaner service. These HFiles cleaners are
called in order, so put the cleaner that prunes the most files in front. To
implement your own BaseHFileCleanerDelegate, just put it in HBase's classpath
and add the fully qualified class name here. Always add the above
default hfile cleaners in the list as they will be overwritten in
hbase-site.xml.</description>
</property>
<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,15 @@ private void setUpProcedureStore() throws IOException {

protected abstract T createProcedureStore(Path storeDir) throws IOException;

protected void postStop(T store) throws IOException {
}

private void tearDownProcedureStore() {
Path storeDir = null;
try {
if (store != null) {
store.stop(false);
postStop(store);
}
FileSystem fs = FileSystem.get(conf);
storeDir = fs.makeQualified(new Path(outputPath));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1460,8 +1460,6 @@ private void startServiceThreads() throws IOException {
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
startProcedureExecutor();

// Create cleaner thread pool
cleanerPool = new DirScanPool(conf);
// Start log cleaner thread
int cleanerInterval =
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
Expand Down Expand Up @@ -1567,8 +1565,10 @@ protected void stopServiceThreads() {

private void createProcedureExecutor() throws IOException {
MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
procedureStore =
new RegionProcedureStore(this, new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
// Create cleaner thread pool
cleanerPool = new DirScanPool(conf);
procedureStore = new RegionProcedureStore(this, cleanerPool,
new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
procedureStore.registerListener(new ProcedureStoreListener() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,42 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
*/
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, DirScanPool pool, Map<String, Object> params) {
super("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
params);

}

/**
* For creating customized HFileCleaner.
* @param name name of the chore being run
* @param period the period of time to sleep between each run
* @param stopper the stopper
* @param conf configuration to use
* @param fs handle to the FS
* @param directory directory to be cleaned
* @param confKey configuration key for the classes to instantiate
* @param pool the thread pool used to scan directories
* @param params params could be used in subclass of BaseHFileCleanerDelegate
*/
public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, String confKey, DirScanPool pool, Map<String, Object> params) {
super(name, period, stopper, conf, fs, directory, confKey, pool, params);
throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
largeQueueInitSize =
conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
smallQueueInitSize =
conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
smallFileQueue = largeFileQueue.getStealFromQueue();
largeFileDeleteThreadNumber =
conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
smallFileDeleteThreadNumber =
conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
cleanerThreadTimeoutMsec =
conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
cleanerThreadCheckIntervalMsec =
conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
cleanerThreadCheckIntervalMsec = conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
startHFileDeleteThreads();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ class RegionFlusherAndCompactor implements Closeable {
flushThread.start();
compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, " +
"compactMin={}", flushSize, flushPerChanges, flushIntervalMs, compactMin);
LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}",
flushSize, flushPerChanges, flushIntervalMs, compactMin);
}

// inject our flush related configurations
Expand All @@ -139,6 +139,7 @@ static void setupConf(Configuration conf) {
private void compact() {
try {
region.compact(true);
Iterables.getOnlyElement(region.getStores()).closeAndArchiveCompactedFiles();
} catch (IOException e) {
LOG.error("Failed to compact procedure store region", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -35,6 +37,7 @@
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
Expand All @@ -45,6 +48,9 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
Expand All @@ -57,6 +63,7 @@
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
Expand Down Expand Up @@ -110,7 +117,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {

static final String MASTER_PROCEDURE_DIR = "MasterProcs";

static final String LOGCLEANER_PLUGINS = "hbase.procedure.store.region.logcleaner.plugins";
static final String HFILECLEANER_PLUGINS = "hbase.procedure.store.region.hfilecleaner.plugins";

private static final String REPLAY_EDITS_DIR = "recovered.wals";

Expand All @@ -129,22 +136,31 @@ public class RegionProcedureStore extends ProcedureStoreBase {

private final Server server;

private final DirScanPool cleanerPool;

private final LeaseRecovery leaseRecovery;

// Used to delete the compacted hfiles. Since we put all data on WAL filesystem, it is not
// possible to move the compacted hfiles to the global hfile archive directory, we have to do it
// by ourselves.
private HFileCleaner cleaner;

private WALFactory walFactory;

@VisibleForTesting
HRegion region;

private RegionFlusherAndCompactor flusherAndCompactor;
@VisibleForTesting
RegionFlusherAndCompactor flusherAndCompactor;

@VisibleForTesting
RegionProcedureStoreWALRoller walRoller;

private int numThreads;

public RegionProcedureStore(Server server, LeaseRecovery leaseRecovery) {
public RegionProcedureStore(Server server, DirScanPool cleanerPool, LeaseRecovery leaseRecovery) {
this.server = server;
this.cleanerPool = cleanerPool;
this.leaseRecovery = leaseRecovery;
}

Expand Down Expand Up @@ -184,6 +200,9 @@ public void stop(boolean abort) {
return;
}
LOG.info("Stopping the Region Procedure Store, isAbort={}", abort);
if (cleaner != null) {
cleaner.cancel(abort);
}
if (flusherAndCompactor != null) {
flusherAndCompactor.close();
}
Expand Down Expand Up @@ -360,11 +379,11 @@ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
}
store.stop(false);
if (!fs.delete(procWALDir, true)) {
throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " +
procWALDir);
throw new IOException(
"Failed to delete the WALProcedureStore migrated proc wal directory " + procWALDir);
}
store.stop(true);
LOG.info("Migration of WALProcedureStore finished");
}

Expand Down Expand Up @@ -400,6 +419,16 @@ public void recoverLease() throws IOException {
}
flusherAndCompactor = new RegionFlusherAndCompactor(conf, server, region);
walRoller.setFlusherAndCompactor(flusherAndCompactor);
int cleanerInterval =
conf.getInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, HMaster.DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
if (!fs.mkdirs(archiveDir)) {
LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" +
" be created again when we actually archive the hfiles later, so continue", archiveDir);
}
cleaner = new HFileCleaner("RegionProcedureStoreHFileCleaner", cleanerInterval, server, conf,
fs, archiveDir, HFILECLEANER_PLUGINS, cleanerPool, Collections.emptyMap());
server.getChoreService().scheduleChore(cleaner);
tryMigrate(fs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
Expand All @@ -46,21 +47,29 @@ private static final class MockServer implements Server {
private final ServerName serverName =
ServerName.valueOf("localhost", 12345, System.currentTimeMillis());

private final ChoreService choreService;

private volatile boolean abort = false;

public MockServer(Configuration conf) {
this.conf = conf;
this.choreService = new ChoreService("Cleaner-Chore-Service");
}

@Override
public void abort(String why, Throwable e) {
abort = true;
choreService.shutdown();
}

@Override
public boolean isAborted() {
return false;
return abort;
}

@Override
public void stop(String why) {
choreService.shutdown();
}

@Override
Expand Down Expand Up @@ -105,10 +114,12 @@ public CoordinatedStateManager getCoordinatedStateManager() {

@Override
public ChoreService getChoreService() {
throw new UnsupportedOperationException();
return choreService;
}
}

private DirScanPool cleanerPool;

@Override
protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOException {
Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
Expand All @@ -123,7 +134,8 @@ protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOExce
initialCountPercentage, null);
conf.setBoolean(RegionProcedureStore.USE_HSYNC_KEY, "hsync".equals(syncType));
CommonFSUtils.setRootDir(conf, storeDir);
return new RegionProcedureStore(new MockServer(conf), (fs, apth) -> {
cleanerPool = new DirScanPool(conf);
return new RegionProcedureStore(new MockServer(conf), cleanerPool, (fs, apth) -> {
});
}

Expand All @@ -138,6 +150,11 @@ protected void printRawFormatResult(long timeTakenNs) {
protected void preWrite(long procId) throws IOException {
}

@Override
protected void postStop(RegionProcedureStore store) throws IOException {
cleanerPool.shutdownNow();
}

public static void main(String[] args) throws IOException {
RegionProcedureStorePerformanceEvaluation tool =
new RegionProcedureStorePerformanceEvaluation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.hadoop.hbase.procedure2.store.region;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand All @@ -32,18 +35,31 @@ public class RegionProcedureStoreTestBase {

protected RegionProcedureStore store;

protected ChoreService choreService;

protected DirScanPool cleanerPool;

protected void configure(Configuration conf) {
}

@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
configure(htu.getConfiguration());
Path testDir = htu.getDataTestDir();
CommonFSUtils.setWALRootDir(htu.getConfiguration(), testDir);
store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new LoadCounter());
choreService = new ChoreService(getClass().getSimpleName());
cleanerPool = new DirScanPool(htu.getConfiguration());
store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
cleanerPool, new LoadCounter());
}

@After
public void tearDown() throws IOException {
store.stop(true);
cleanerPool.shutdownNow();
choreService.shutdown();
htu.cleanupTestDir();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;

Expand All @@ -34,13 +36,14 @@ final class RegionProcedureStoreTestHelper {
private RegionProcedureStoreTestHelper() {
}

static RegionProcedureStore createStore(Configuration conf, ProcedureLoader loader)
throws IOException {
static RegionProcedureStore createStore(Configuration conf, ChoreService choreService,
DirScanPool cleanerPool, ProcedureLoader loader) throws IOException {
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(conf);
when(server.getServerName())
.thenReturn(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
RegionProcedureStore store = new RegionProcedureStore(server, new LeaseRecovery() {
when(server.getChoreService()).thenReturn(choreService);
RegionProcedureStore store = new RegionProcedureStore(server, cleanerPool, new LeaseRecovery() {

@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
Expand Down
Loading

0 comments on commit 936c952

Please sign in to comment.