Skip to content

Commit

Permalink
HBASE-17972 Remove mergePool from CompactSplitThread (Guangxu Cheng)
Browse files Browse the repository at this point in the history
  • Loading branch information
tedyu committed Apr 28, 2017
1 parent b401a35 commit 5411d3e
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 57 deletions.
Expand Up @@ -72,10 +72,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
// Configuration key for split threads
public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
public final static int SPLIT_THREADS_DEFAULT = 1;

// Configuration keys for merge threads
public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
public final static int MERGE_THREADS_DEFAULT = 1;

public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
"hbase.regionserver.regionSplitLimit";
Expand All @@ -87,7 +83,6 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
private final ThreadPoolExecutor longCompactions;
private final ThreadPoolExecutor shortCompactions;
private final ThreadPoolExecutor splits;
private final ThreadPoolExecutor mergePool;

private volatile ThroughputController compactionThroughputController;

Expand Down Expand Up @@ -150,15 +145,6 @@ public Thread newThread(Runnable r) {
return new Thread(r, name);
}
});
int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
mergeThreads, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
String name = n + "-merges-" + System.currentTimeMillis();
return new Thread(r, name);
}
});

// compaction throughput controller
this.compactionThroughputController =
Expand All @@ -170,8 +156,7 @@ public String toString() {
return "compaction_queue=("
+ longCompactions.getQueue().size() + ":"
+ shortCompactions.getQueue().size() + ")"
+ ", split_queue=" + splits.getQueue().size()
+ ", merge_queue=" + mergePool.getQueue().size();
+ ", split_queue=" + splits.getQueue().size();
}

public String dumpQueue() {
Expand Down Expand Up @@ -205,15 +190,6 @@ public String dumpQueue() {
queueLists.append("\n");
}

queueLists.append("\n");
queueLists.append(" Region Merge Queue:\n");
lq = mergePool.getQueue();
it = lq.iterator();
while (it.hasNext()) {
queueLists.append(" " + it.next().toString());
queueLists.append("\n");
}

return queueLists.toString();
}

Expand Down Expand Up @@ -372,7 +348,6 @@ private CompactionContext selectCompaction(final Region r, final Store s,
*/
void interruptIfNecessary() {
splits.shutdown();
mergePool.shutdown();
longCompactions.shutdown();
shortCompactions.shutdown();
}
Expand All @@ -394,7 +369,6 @@ private void waitFor(ThreadPoolExecutor t, String name) {

void join() {
waitFor(splits, "Split Thread");
waitFor(mergePool, "Merge Thread");
waitFor(longCompactions, "Large Compaction Thread");
waitFor(shortCompactions, "Small Compaction Thread");
}
Expand Down Expand Up @@ -641,21 +615,6 @@ public void onConfigurationChange(Configuration newConf) {
}
}

int mergeThreads = newConf.getInt(MERGE_THREADS,
MERGE_THREADS_DEFAULT);
if (this.mergePool.getCorePoolSize() != mergeThreads) {
LOG.info("Changing the value of " + MERGE_THREADS +
" from " + this.mergePool.getCorePoolSize() + " to " +
mergeThreads);
if(this.mergePool.getCorePoolSize() < mergeThreads) {
this.mergePool.setMaximumPoolSize(mergeThreads);
this.mergePool.setCorePoolSize(mergeThreads);
} else {
this.mergePool.setCorePoolSize(mergeThreads);
this.mergePool.setMaximumPoolSize(mergeThreads);
}
}

ThroughputController old = this.compactionThroughputController;
if (old != null) {
old.stop("configuration change");
Expand All @@ -680,10 +639,6 @@ protected int getSplitThreadNum() {
return this.splits.getCorePoolSize();
}

protected int getMergeThreadNum() {
return this.mergePool.getCorePoolSize();
}

/**
* {@inheritDoc}
*/
Expand All @@ -705,11 +660,6 @@ public ThroughputController getCompactionThroughputController() {
return compactionThroughputController;
}

@VisibleForTesting
public long getCompletedMergeTaskCount() {
return mergePool.getCompletedTaskCount();
}

@VisibleForTesting
/**
* Shutdown the long compaction thread pool.
Expand Down
Expand Up @@ -80,7 +80,6 @@ private static void setupConf(Configuration conf) {
conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 3);
conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 4);
conf.setInt(CompactSplitThread.SPLIT_THREADS, 5);
conf.setInt(CompactSplitThread.MERGE_THREADS, 6);
}

@After
Expand Down Expand Up @@ -113,13 +112,11 @@ public void testThreadPoolSizeTuning() throws Exception {
assertEquals(3, regionServer.compactSplitThread.getLargeCompactionThreadNum());
assertEquals(4, regionServer.compactSplitThread.getSmallCompactionThreadNum());
assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum());
assertEquals(6, regionServer.compactSplitThread.getMergeThreadNum());

// change bigger configurations and do online update
conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 4);
conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 5);
conf.setInt(CompactSplitThread.SPLIT_THREADS, 6);
conf.setInt(CompactSplitThread.MERGE_THREADS, 7);
try {
regionServer.compactSplitThread.onConfigurationChange(conf);
} catch (IllegalArgumentException iae) {
Expand All @@ -130,13 +127,11 @@ public void testThreadPoolSizeTuning() throws Exception {
assertEquals(4, regionServer.compactSplitThread.getLargeCompactionThreadNum());
assertEquals(5, regionServer.compactSplitThread.getSmallCompactionThreadNum());
assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum());
assertEquals(7, regionServer.compactSplitThread.getMergeThreadNum());

// change smaller configurations and do online update
conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 2);
conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 3);
conf.setInt(CompactSplitThread.SPLIT_THREADS, 4);
conf.setInt(CompactSplitThread.MERGE_THREADS, 5);
try {
regionServer.compactSplitThread.onConfigurationChange(conf);
} catch (IllegalArgumentException iae) {
Expand All @@ -147,7 +142,6 @@ public void testThreadPoolSizeTuning() throws Exception {
assertEquals(2, regionServer.compactSplitThread.getLargeCompactionThreadNum());
assertEquals(3, regionServer.compactSplitThread.getSmallCompactionThreadNum());
assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum());
assertEquals(5, regionServer.compactSplitThread.getMergeThreadNum());
} finally {
conn.close();
}
Expand Down

0 comments on commit 5411d3e

Please sign in to comment.