diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 1bdf6a225c62..1c5940ca9aed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2074,6 +2074,7 @@ private void registerConfigurationObservers() { } // Registering the compactSplitThread object with the ConfigurationManager. configurationManager.registerObserver(this.compactSplitThread); + configurationManager.registerObserver(this.cacheFlusher); configurationManager.registerObserver(this.rpcServices); configurationManager.registerObserver(this); } @@ -2454,7 +2455,7 @@ protected void stopServiceThreads() { bootstrapNodeManager.stop(); } if (this.cacheFlusher != null) { - this.cacheFlusher.join(); + this.cacheFlusher.shutdown(); } if (this.walRoller != null) { this.walRoller.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 44b19192542b..301b97e9a50d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -36,12 +37,14 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; @@ -63,7 +66,7 @@ * @see FlushRequester */ @InterfaceAudience.Private -public class MemStoreFlusher implements FlushRequester { +public class MemStoreFlusher implements FlushRequester, ConfigurationObserver { private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class); private Configuration conf; @@ -81,7 +84,12 @@ public class MemStoreFlusher implements FlushRequester { private long blockingWaitTime; private final LongAdder updatesBlockedMsHighWater = new LongAdder(); - private final FlushHandler[] flushHandlers; + private FlushHandler[] flushHandlers; + + private final AtomicInteger flusherIdGen = new AtomicInteger(); + + private ThreadFactory flusherThreadFactory; + private List flushRequestListeners = new ArrayList<>(1); /** @@ -117,14 +125,9 @@ public MemStoreFlusher(final Configuration conf, final HRegionServer server) { this.server = server; this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); - int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); + int handlerCount = 0; if (server != null) { - if (handlerCount < 1) { - LOG.warn( - "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1", - handlerCount); - handlerCount = 1; - } + handlerCount = getHandlerCount(conf); LOG.info("globalMemStoreLimit=" + TraditionalBinaryPrefix .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1) @@ -305,13 +308,15 @@ private static long getMemStoreDataSize(HRegion r) { private class FlushHandler extends Thread { + private final AtomicBoolean running = new AtomicBoolean(true); + private FlushHandler(String name) { super(name); } @Override public void run() { - while (!server.isStopped()) { + while (!server.isStopped() && running.get()) { FlushQueueEntry fqe = null; try { wakeupPending.set(false); // allow someone to wake us up again @@ -356,15 +361,24 @@ public void run() { } } } - synchronized (regionsInQueue) { - regionsInQueue.clear(); - flushQueue.clear(); - } - // Signal anyone waiting, so they see the close flag - wakeUpIfBlocking(); + if (server.isStopped()) { + synchronized (regionsInQueue) { + regionsInQueue.clear(); + flushQueue.clear(); + } + + // Signal anyone waiting, so they see the close flag + wakeUpIfBlocking(); + } LOG.info(getName() + " exiting"); } + + public void shutdown() { + if (!running.compareAndSet(true, false)) { + LOG.warn("{} is already signaled to shutdown", getName()); + } + } } private void wakeupFlushThread() { @@ -497,8 +511,10 @@ public int getFlushQueueSize() { void interruptIfNecessary() { lock.writeLock().lock(); try { - for (FlushHandler flushHander : flushHandlers) { - if (flushHander != null) flushHander.interrupt(); + for (FlushHandler flushHandler : flushHandlers) { + if (flushHandler != null) { + flushHandler.interrupt(); + } } } finally { lock.writeLock().unlock(); @@ -506,30 +522,40 @@ void interruptIfNecessary() { } synchronized void start(UncaughtExceptionHandler eh) { - ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder() - .setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d") - .setDaemon(true).setUncaughtExceptionHandler(eh).build(); - for (int i = 0; i < flushHandlers.length; i++) { - flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i); - flusherThreadFactory.newThread(flushHandlers[i]); - flushHandlers[i].start(); + this.flusherThreadFactory = + new ThreadFactoryBuilder().setDaemon(true).setUncaughtExceptionHandler(eh).build(); + lock.readLock().lock(); + try { + startFlushHandlerThreads(flushHandlers, 0, flushHandlers.length); + } finally { + lock.readLock().unlock(); } } boolean isAlive() { - for (FlushHandler flushHander : flushHandlers) { - if (flushHander != null && flushHander.isAlive()) { - return true; + lock.readLock().lock(); + try { + for (FlushHandler flushHandler : flushHandlers) { + if (flushHandler != null && flushHandler.isAlive()) { + return true; + } } + return false; + } finally { + lock.readLock().unlock(); } - return false; } - void join() { - for (FlushHandler flushHander : flushHandlers) { - if (flushHander != null) { - Threads.shutdown(flushHander); + void shutdown() { + lock.readLock().lock(); + try { + for (FlushHandler flushHandler : flushHandlers) { + if (flushHandler != null) { + Threads.shutdown(flushHandler); + } } + } finally { + lock.readLock().unlock(); } } @@ -924,4 +950,60 @@ public boolean equals(Object obj) { return compareTo(other) == 0; } } + + private int getHandlerCount(Configuration conf) { + int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); + if (handlerCount < 1) { + LOG.warn( + "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1", + handlerCount); + handlerCount = 1; + } + return handlerCount; + } + + @Override + public void onConfigurationChange(Configuration newConf) { + int newHandlerCount = getHandlerCount(newConf); + if (newHandlerCount != flushHandlers.length) { + LOG.info("update hbase.hstore.flusher.count from {} to {}", flushHandlers.length, + newHandlerCount); + lock.writeLock().lock(); + try { + FlushHandler[] newFlushHandlers = Arrays.copyOf(flushHandlers, newHandlerCount); + if (newHandlerCount > flushHandlers.length) { + startFlushHandlerThreads(newFlushHandlers, flushHandlers.length, newFlushHandlers.length); + } else { + stopFlushHandlerThreads(flushHandlers, newHandlerCount, flushHandlers.length); + } + flusherIdGen.compareAndSet(flushHandlers.length, newFlushHandlers.length); + this.flushHandlers = newFlushHandlers; + } finally { + lock.writeLock().unlock(); + } + } + } + + private void startFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) { + if (flusherThreadFactory != null) { + for (int i = start; i < end; i++) { + flushHandlers[i] = new FlushHandler("MemStoreFlusher." + flusherIdGen.getAndIncrement()); + flusherThreadFactory.newThread(flushHandlers[i]); + flushHandlers[i].start(); + } + } + } + + private void stopFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) { + for (int i = start; i < end; i++) { + flushHandlers[i].shutdown(); + if (LOG.isDebugEnabled()) { + LOG.debug("send shutdown signal to {}", flushHandlers[i].getName()); + } + } + } + + public int getFlusherCount() { + return flusherIdGen.get(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java index 0bc7946e8a10..158dd91d9a06 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java @@ -94,4 +94,26 @@ public void testNotReplaceDelayedFlushEntryWhichExpired() { assertEquals(1, msf.getFlushQueueSize()); assertTrue(msf.regionsInQueue.get(r).isDelay()); } + + @Test + public void testChangeFlusherCount() { + Configuration conf = new Configuration(); + conf.set("hbase.hstore.flusher.count", "0"); + HRegionServer rs = mock(HRegionServer.class); + doReturn(false).when(rs).isStopped(); + doReturn(new RegionServerAccounting(conf)).when(rs).getRegionServerAccounting(); + + msf = new MemStoreFlusher(conf, rs); + msf.start(Threads.LOGGING_EXCEPTION_HANDLER); + + Configuration newConf = new Configuration(); + + newConf.set("hbase.hstore.flusher.count", "3"); + msf.onConfigurationChange(newConf); + assertEquals(3, msf.getFlusherCount()); + + newConf.set("hbase.hstore.flusher.count", "0"); + msf.onConfigurationChange(newConf); + assertEquals(1, msf.getFlusherCount()); + } }