diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 557608f07a1c..5b7d29c0b236 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -151,7 +151,18 @@ possible configurations would overwhelm and obscure the important. so put the cleaner that prunes the most files in front. To implement your own BaseHFileCleanerDelegate, just put it in HBase's classpath and add the fully qualified class name here. Always add the above - default log cleaners in the list as they will be overwritten in + default hfile cleaners in the list as they will be overwritten in + hbase-site.xml. + + + hbase.procedure.store.region.hfilecleaner.plugins + org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner + A comma-separated list of BaseHFileCleanerDelegate invoked by + the RegionProcedureStore HFileCleaner service. These HFiles cleaners are + called in order, so put the cleaner that prunes the most files in front. To + implement your own BaseHFileCleanerDelegate, just put it in HBase's classpath + and add the fully qualified class name here. Always add the above + default hfile cleaners in the list as they will be overwritten in hbase-site.xml. diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java index b4888c5b1628..cb31f02c429c 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java @@ -123,11 +123,15 @@ private void setUpProcedureStore() throws IOException { protected abstract T createProcedureStore(Path storeDir) throws IOException; + protected void postStop(T store) throws IOException { + } + private void tearDownProcedureStore() { Path storeDir = null; try { if (store != null) { store.stop(false); + postStop(store); } FileSystem fs = FileSystem.get(conf); storeDir = fs.makeQualified(new Path(outputPath)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index b5f19d85d67b..05ab1a4ee177 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1460,8 +1460,6 @@ private void startServiceThreads() throws IOException { this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); startProcedureExecutor(); - // Create cleaner thread pool - cleanerPool = new DirScanPool(conf); // Start log cleaner thread int cleanerInterval = conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL); @@ -1567,8 +1565,10 @@ protected void stopServiceThreads() { private void createProcedureExecutor() throws IOException { MasterProcedureEnv procEnv = new MasterProcedureEnv(this); - procedureStore = - new RegionProcedureStore(this, new MasterProcedureEnv.FsUtilsLeaseRecovery(this)); + // Create cleaner thread pool + cleanerPool = new DirScanPool(conf); + procedureStore = new RegionProcedureStore(this, cleanerPool, + new MasterProcedureEnv.FsUtilsLeaseRecovery(this)); procedureStore.registerListener(new ProcedureStoreListener() { @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 4b50ab40d10e..1747da164b62 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -117,25 +117,42 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con */ public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, Path directory, DirScanPool pool, Map params) { - super("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool, + this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool, params); + + } + + /** + * For creating customized HFileCleaner. + * @param name name of the chore being run + * @param period the period of time to sleep between each run + * @param stopper the stopper + * @param conf configuration to use + * @param fs handle to the FS + * @param directory directory to be cleaned + * @param confKey configuration key for the classes to instantiate + * @param pool the thread pool used to scan directories + * @param params params could be used in subclass of BaseHFileCleanerDelegate + */ + public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs, + Path directory, String confKey, DirScanPool pool, Map params) { + super(name, period, stopper, conf, fs, directory, confKey, pool, params); throttlePoint = - conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); + conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); largeQueueInitSize = - conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); + conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); smallQueueInitSize = - conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); + conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR); smallFileQueue = largeFileQueue.getStealFromQueue(); largeFileDeleteThreadNumber = - conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); + conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); smallFileDeleteThreadNumber = - conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); + conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); cleanerThreadTimeoutMsec = - conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); - cleanerThreadCheckIntervalMsec = - conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, - DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); + conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); + cleanerThreadCheckIntervalMsec = conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, + DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); startHFileDeleteThreads(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java index 8e3ffa375ef7..57e62ddf6541 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java @@ -120,8 +120,8 @@ class RegionFlusherAndCompactor implements Closeable { flushThread.start(); compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() .setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build()); - LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, " + - "compactMin={}", flushSize, flushPerChanges, flushIntervalMs, compactMin); + LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}", + flushSize, flushPerChanges, flushIntervalMs, compactMin); } // inject our flush related configurations @@ -139,6 +139,7 @@ static void setupConf(Configuration conf) { private void compact() { try { region.compact(true); + Iterables.getOnlyElement(region.getStores()).closeAndArchiveCompactedFiles(); } catch (IOException e) { LOG.error("Failed to compact procedure store region", e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java index a89a9aa9fd7c..7552ca152894 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java @@ -25,6 +25,8 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; @@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; @@ -45,6 +48,9 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.cleaner.DirScanPool; +import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; @@ -57,6 +63,7 @@ import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; @@ -110,7 +117,7 @@ public class RegionProcedureStore extends ProcedureStoreBase { static final String MASTER_PROCEDURE_DIR = "MasterProcs"; - static final String LOGCLEANER_PLUGINS = "hbase.procedure.store.region.logcleaner.plugins"; + static final String HFILECLEANER_PLUGINS = "hbase.procedure.store.region.hfilecleaner.plugins"; private static final String REPLAY_EDITS_DIR = "recovered.wals"; @@ -129,22 +136,31 @@ public class RegionProcedureStore extends ProcedureStoreBase { private final Server server; + private final DirScanPool cleanerPool; + private final LeaseRecovery leaseRecovery; + // Used to delete the compacted hfiles. Since we put all data on WAL filesystem, it is not + // possible to move the compacted hfiles to the global hfile archive directory, we have to do it + // by ourselves. + private HFileCleaner cleaner; + private WALFactory walFactory; @VisibleForTesting HRegion region; - private RegionFlusherAndCompactor flusherAndCompactor; + @VisibleForTesting + RegionFlusherAndCompactor flusherAndCompactor; @VisibleForTesting RegionProcedureStoreWALRoller walRoller; private int numThreads; - public RegionProcedureStore(Server server, LeaseRecovery leaseRecovery) { + public RegionProcedureStore(Server server, DirScanPool cleanerPool, LeaseRecovery leaseRecovery) { this.server = server; + this.cleanerPool = cleanerPool; this.leaseRecovery = leaseRecovery; } @@ -184,6 +200,9 @@ public void stop(boolean abort) { return; } LOG.info("Stopping the Region Procedure Store, isAbort={}", abort); + if (cleaner != null) { + cleaner.cancel(abort); + } if (flusherAndCompactor != null) { flusherAndCompactor.close(); } @@ -360,11 +379,11 @@ public void handleCorrupted(ProcedureIterator procIter) throws IOException { } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) { LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures"); } + store.stop(false); if (!fs.delete(procWALDir, true)) { - throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " + - procWALDir); + throw new IOException( + "Failed to delete the WALProcedureStore migrated proc wal directory " + procWALDir); } - store.stop(true); LOG.info("Migration of WALProcedureStore finished"); } @@ -400,6 +419,16 @@ public void recoverLease() throws IOException { } flusherAndCompactor = new RegionFlusherAndCompactor(conf, server, region); walRoller.setFlusherAndCompactor(flusherAndCompactor); + int cleanerInterval = + conf.getInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, HMaster.DEFAULT_HBASE_MASTER_CLEANER_INTERVAL); + Path archiveDir = HFileArchiveUtil.getArchivePath(conf); + if (!fs.mkdirs(archiveDir)) { + LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" + + " be created again when we actually archive the hfiles later, so continue", archiveDir); + } + cleaner = new HFileCleaner("RegionProcedureStoreHFileCleaner", cleanerInterval, server, conf, + fs, archiveDir, HFILECLEANER_PLUGINS, cleanerPool, Collections.emptyMap()); + server.getChoreService().scheduleChore(cleaner); tryMigrate(fs); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java index f3ab2e52fa65..92be8971f540 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.master.cleaner.DirScanPool; import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation; import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.MemStoreLAB; @@ -46,21 +47,29 @@ private static final class MockServer implements Server { private final ServerName serverName = ServerName.valueOf("localhost", 12345, System.currentTimeMillis()); + private final ChoreService choreService; + + private volatile boolean abort = false; + public MockServer(Configuration conf) { this.conf = conf; + this.choreService = new ChoreService("Cleaner-Chore-Service"); } @Override public void abort(String why, Throwable e) { + abort = true; + choreService.shutdown(); } @Override public boolean isAborted() { - return false; + return abort; } @Override public void stop(String why) { + choreService.shutdown(); } @Override @@ -105,10 +114,12 @@ public CoordinatedStateManager getCoordinatedStateManager() { @Override public ChoreService getChoreService() { - throw new UnsupportedOperationException(); + return choreService; } } + private DirScanPool cleanerPool; + @Override protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOException { Pair pair = MemorySizeUtil.getGlobalMemStoreSize(conf); @@ -123,7 +134,8 @@ protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOExce initialCountPercentage, null); conf.setBoolean(RegionProcedureStore.USE_HSYNC_KEY, "hsync".equals(syncType)); CommonFSUtils.setRootDir(conf, storeDir); - return new RegionProcedureStore(new MockServer(conf), (fs, apth) -> { + cleanerPool = new DirScanPool(conf); + return new RegionProcedureStore(new MockServer(conf), cleanerPool, (fs, apth) -> { }); } @@ -138,6 +150,11 @@ protected void printRawFormatResult(long timeTakenNs) { protected void preWrite(long procId) throws IOException { } + @Override + protected void postStop(RegionProcedureStore store) throws IOException { + cleanerPool.shutdownNow(); + } + public static void main(String[] args) throws IOException { RegionProcedureStorePerformanceEvaluation tool = new RegionProcedureStorePerformanceEvaluation(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java index 6f0780572677..c5694d2be870 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java @@ -18,8 +18,11 @@ package org.apache.hadoop.hbase.procedure2.store.region; import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.master.cleaner.DirScanPool; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; import org.apache.hadoop.hbase.regionserver.MemStoreLAB; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -32,18 +35,31 @@ public class RegionProcedureStoreTestBase { protected RegionProcedureStore store; + protected ChoreService choreService; + + protected DirScanPool cleanerPool; + + protected void configure(Configuration conf) { + } + @Before public void setUp() throws IOException { htu = new HBaseCommonTestingUtility(); htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false); + configure(htu.getConfiguration()); Path testDir = htu.getDataTestDir(); CommonFSUtils.setWALRootDir(htu.getConfiguration(), testDir); - store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new LoadCounter()); + choreService = new ChoreService(getClass().getSimpleName()); + cleanerPool = new DirScanPool(htu.getConfiguration()); + store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService, + cleanerPool, new LoadCounter()); } @After public void tearDown() throws IOException { store.stop(true); + cleanerPool.shutdownNow(); + choreService.shutdown(); htu.cleanupTestDir(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java index d550e7fbe4bd..5497b8a5439b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java @@ -24,8 +24,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.cleaner.DirScanPool; import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader; @@ -34,13 +36,14 @@ final class RegionProcedureStoreTestHelper { private RegionProcedureStoreTestHelper() { } - static RegionProcedureStore createStore(Configuration conf, ProcedureLoader loader) - throws IOException { + static RegionProcedureStore createStore(Configuration conf, ChoreService choreService, + DirScanPool cleanerPool, ProcedureLoader loader) throws IOException { Server server = mock(Server.class); when(server.getConfiguration()).thenReturn(conf); when(server.getServerName()) .thenReturn(ServerName.valueOf("localhost", 12345, System.currentTimeMillis())); - RegionProcedureStore store = new RegionProcedureStore(server, new LeaseRecovery() { + when(server.getChoreService()).thenReturn(choreService); + RegionProcedureStore store = new RegionProcedureStore(server, cleanerPool, new LeaseRecovery() { @Override public void recoverFileLease(FileSystem fs, Path path) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java new file mode 100644 index 000000000000..15682bb8cff7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2.store.region; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.FileNotFoundException; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestRegionProcedureStoreCompaction extends RegionProcedureStoreTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionProcedureStoreCompaction.class); + + private int compactMin = 4; + + @Override + protected void configure(Configuration conf) { + conf.setInt(RegionFlusherAndCompactor.COMPACT_MIN_KEY, compactMin); + conf.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 500); + conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 5000); + } + + private int getStorefilesCount() { + return Iterables.getOnlyElement(store.region.getStores()).getStorefilesCount(); + } + + @Test + public void test() throws IOException, InterruptedException { + for (int i = 0; i < compactMin - 1; i++) { + store.insert(new RegionProcedureStoreTestProcedure(), null); + store.region.flush(true); + } + assertEquals(compactMin - 1, getStorefilesCount()); + store.insert(new RegionProcedureStoreTestProcedure(), null); + store.flusherAndCompactor.requestFlush(); + htu.waitFor(15000, () -> getStorefilesCount() == 1); + Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir( + new Path(htu.getDataTestDir(), RegionProcedureStore.MASTER_PROCEDURE_DIR), + store.region.getRegionInfo(), RegionProcedureStore.FAMILY); + FileSystem fs = storeArchiveDir.getFileSystem(htu.getConfiguration()); + // after compaction, the old hfiles should have been compacted + htu.waitFor(15000, () -> { + try { + FileStatus[] fses = fs.listStatus(storeArchiveDir); + return fses != null && fses.length == compactMin; + } catch (FileNotFoundException e) { + return false; + } + }); + // ttl has not expired, so should not delete any files + Thread.sleep(1000); + FileStatus[] compactedHFiles = fs.listStatus(storeArchiveDir); + assertEquals(4, compactedHFiles.length); + Thread.sleep(2000); + // touch one file + long currentTime = System.currentTimeMillis(); + fs.setTimes(compactedHFiles[0].getPath(), currentTime, currentTime); + Thread.sleep(3000); + // only the touched file is still there after clean up + FileStatus[] remainingHFiles = fs.listStatus(storeArchiveDir); + assertEquals(1, remainingHFiles.length); + assertEquals(compactedHFiles[0].getPath(), remainingHFiles[0].getPath()); + Thread.sleep(6000); + // the touched file should also be cleaned up and then the cleaner will delete the parent + // directory since it is empty. + assertFalse(fs.exists(storeArchiveDir)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java index 475ae595d265..ede1bdf6ce90 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java @@ -30,8 +30,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.master.cleaner.DirScanPool; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; @@ -61,6 +63,10 @@ public class TestRegionProcedureStoreMigration { private WALProcedureStore walStore; + private ChoreService choreService; + + private DirScanPool cleanerPool; + @Before public void setUp() throws IOException { htu = new HBaseCommonTestingUtility(); @@ -77,6 +83,8 @@ public void recoverFileLease(FileSystem fs, Path path) throws IOException { walStore.start(1); walStore.recoverLease(); walStore.load(new LoadCounter()); + choreService = new ChoreService(getClass().getSimpleName()); + cleanerPool = new DirScanPool(htu.getConfiguration()); } @After @@ -85,6 +93,8 @@ public void tearDown() throws IOException { store.stop(true); } walStore.stop(true); + cleanerPool.shutdownNow(); + choreService.shutdown(); htu.cleanupTestDir(); } @@ -103,8 +113,8 @@ public void test() throws IOException { SortedSet loadedProcs = new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId())); MutableLong maxProcIdSet = new MutableLong(0); - store = - RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new ProcedureLoader() { + store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService, + cleanerPool, new ProcedureLoader() { @Override public void setMaxProcId(long maxProcId) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java index 826c7637bb37..db499427df5a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java @@ -95,7 +95,8 @@ public boolean isStopped() { } }, conf, fs, globalWALArchiveDir, dirScanPool); choreService.scheduleChore(logCleaner); - store = RegionProcedureStoreTestHelper.createStore(conf, new LoadCounter()); + store = RegionProcedureStoreTestHelper.createStore(conf, choreService, dirScanPool, + new LoadCounter()); } @After