Skip to content

Commit

Permalink
HBASE-27855 Support dynamic adjustment of flusher count (#5247)
Browse files Browse the repository at this point in the history
Co-authored-by: huiruan <876107431@qq.com>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
frostruan and huiruan committed Jun 6, 2023
1 parent 8c83906 commit 40976b0
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2074,6 +2074,7 @@ private void registerConfigurationObservers() {
}
// Registering the compactSplitThread object with the ConfigurationManager.
configurationManager.registerObserver(this.compactSplitThread);
configurationManager.registerObserver(this.cacheFlusher);
configurationManager.registerObserver(this.rpcServices);
configurationManager.registerObserver(this);
}
Expand Down Expand Up @@ -2454,7 +2455,7 @@ protected void stopServiceThreads() {
bootstrapNodeManager.stop();
}
if (this.cacheFlusher != null) {
this.cacheFlusher.join();
this.cacheFlusher.shutdown();
}
if (this.walRoller != null) {
this.walRoller.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
Expand All @@ -36,12 +37,14 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
Expand All @@ -63,7 +66,7 @@
* @see FlushRequester
*/
@InterfaceAudience.Private
public class MemStoreFlusher implements FlushRequester {
public class MemStoreFlusher implements FlushRequester, ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class);

private Configuration conf;
Expand All @@ -81,7 +84,12 @@ public class MemStoreFlusher implements FlushRequester {
private long blockingWaitTime;
private final LongAdder updatesBlockedMsHighWater = new LongAdder();

private final FlushHandler[] flushHandlers;
private FlushHandler[] flushHandlers;

private final AtomicInteger flusherIdGen = new AtomicInteger();

private ThreadFactory flusherThreadFactory;

private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);

/**
Expand Down Expand Up @@ -117,14 +125,9 @@ public MemStoreFlusher(final Configuration conf, final HRegionServer server) {
this.server = server;
this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000);
int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
int handlerCount = 0;
if (server != null) {
if (handlerCount < 1) {
LOG.warn(
"hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1",
handlerCount);
handlerCount = 1;
}
handlerCount = getHandlerCount(conf);
LOG.info("globalMemStoreLimit="
+ TraditionalBinaryPrefix
.long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
Expand Down Expand Up @@ -305,13 +308,15 @@ private static long getMemStoreDataSize(HRegion r) {

private class FlushHandler extends Thread {

private final AtomicBoolean running = new AtomicBoolean(true);

private FlushHandler(String name) {
super(name);
}

@Override
public void run() {
while (!server.isStopped()) {
while (!server.isStopped() && running.get()) {
FlushQueueEntry fqe = null;
try {
wakeupPending.set(false); // allow someone to wake us up again
Expand Down Expand Up @@ -356,15 +361,24 @@ public void run() {
}
}
}
synchronized (regionsInQueue) {
regionsInQueue.clear();
flushQueue.clear();
}

// Signal anyone waiting, so they see the close flag
wakeUpIfBlocking();
if (server.isStopped()) {
synchronized (regionsInQueue) {
regionsInQueue.clear();
flushQueue.clear();
}

// Signal anyone waiting, so they see the close flag
wakeUpIfBlocking();
}
LOG.info(getName() + " exiting");
}

public void shutdown() {
if (!running.compareAndSet(true, false)) {
LOG.warn("{} is already signaled to shutdown", getName());
}
}
}

private void wakeupFlushThread() {
Expand Down Expand Up @@ -497,39 +511,51 @@ public int getFlushQueueSize() {
void interruptIfNecessary() {
lock.writeLock().lock();
try {
for (FlushHandler flushHander : flushHandlers) {
if (flushHander != null) flushHander.interrupt();
for (FlushHandler flushHandler : flushHandlers) {
if (flushHandler != null) {
flushHandler.interrupt();
}
}
} finally {
lock.writeLock().unlock();
}
}

synchronized void start(UncaughtExceptionHandler eh) {
ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder()
.setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d")
.setDaemon(true).setUncaughtExceptionHandler(eh).build();
for (int i = 0; i < flushHandlers.length; i++) {
flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
flusherThreadFactory.newThread(flushHandlers[i]);
flushHandlers[i].start();
this.flusherThreadFactory =
new ThreadFactoryBuilder().setDaemon(true).setUncaughtExceptionHandler(eh).build();
lock.readLock().lock();
try {
startFlushHandlerThreads(flushHandlers, 0, flushHandlers.length);
} finally {
lock.readLock().unlock();
}
}

boolean isAlive() {
for (FlushHandler flushHander : flushHandlers) {
if (flushHander != null && flushHander.isAlive()) {
return true;
lock.readLock().lock();
try {
for (FlushHandler flushHandler : flushHandlers) {
if (flushHandler != null && flushHandler.isAlive()) {
return true;
}
}
return false;
} finally {
lock.readLock().unlock();
}
return false;
}

void join() {
for (FlushHandler flushHander : flushHandlers) {
if (flushHander != null) {
Threads.shutdown(flushHander);
void shutdown() {
lock.readLock().lock();
try {
for (FlushHandler flushHandler : flushHandlers) {
if (flushHandler != null) {
Threads.shutdown(flushHandler);
}
}
} finally {
lock.readLock().unlock();
}
}

Expand Down Expand Up @@ -924,4 +950,60 @@ public boolean equals(Object obj) {
return compareTo(other) == 0;
}
}

private int getHandlerCount(Configuration conf) {
int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
if (handlerCount < 1) {
LOG.warn(
"hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1",
handlerCount);
handlerCount = 1;
}
return handlerCount;
}

@Override
public void onConfigurationChange(Configuration newConf) {
int newHandlerCount = getHandlerCount(newConf);
if (newHandlerCount != flushHandlers.length) {
LOG.info("update hbase.hstore.flusher.count from {} to {}", flushHandlers.length,
newHandlerCount);
lock.writeLock().lock();
try {
FlushHandler[] newFlushHandlers = Arrays.copyOf(flushHandlers, newHandlerCount);
if (newHandlerCount > flushHandlers.length) {
startFlushHandlerThreads(newFlushHandlers, flushHandlers.length, newFlushHandlers.length);
} else {
stopFlushHandlerThreads(flushHandlers, newHandlerCount, flushHandlers.length);
}
flusherIdGen.compareAndSet(flushHandlers.length, newFlushHandlers.length);
this.flushHandlers = newFlushHandlers;
} finally {
lock.writeLock().unlock();
}
}
}

private void startFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) {
if (flusherThreadFactory != null) {
for (int i = start; i < end; i++) {
flushHandlers[i] = new FlushHandler("MemStoreFlusher." + flusherIdGen.getAndIncrement());
flusherThreadFactory.newThread(flushHandlers[i]);
flushHandlers[i].start();
}
}
}

private void stopFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) {
for (int i = start; i < end; i++) {
flushHandlers[i].shutdown();
if (LOG.isDebugEnabled()) {
LOG.debug("send shutdown signal to {}", flushHandlers[i].getName());
}
}
}

public int getFlusherCount() {
return flusherIdGen.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,26 @@ public void testNotReplaceDelayedFlushEntryWhichExpired() {
assertEquals(1, msf.getFlushQueueSize());
assertTrue(msf.regionsInQueue.get(r).isDelay());
}

@Test
public void testChangeFlusherCount() {
Configuration conf = new Configuration();
conf.set("hbase.hstore.flusher.count", "0");
HRegionServer rs = mock(HRegionServer.class);
doReturn(false).when(rs).isStopped();
doReturn(new RegionServerAccounting(conf)).when(rs).getRegionServerAccounting();

msf = new MemStoreFlusher(conf, rs);
msf.start(Threads.LOGGING_EXCEPTION_HANDLER);

Configuration newConf = new Configuration();

newConf.set("hbase.hstore.flusher.count", "3");
msf.onConfigurationChange(newConf);
assertEquals(3, msf.getFlusherCount());

newConf.set("hbase.hstore.flusher.count", "0");
msf.onConfigurationChange(newConf);
assertEquals(1, msf.getFlusherCount());
}
}

0 comments on commit 40976b0

Please sign in to comment.