Skip to content

Commit

Permalink
HBASE-25528: Dedicated merge dispatch threadpool on master (#2909)
Browse files Browse the repository at this point in the history
Adds "hbase.master.executor.merge.dispatch.threads" and defaults to 2.

Also adds additional logging that includes the number of split plans
and merge plans computed for each normalizer run.

(cherry picked from commit 36b4698)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Michael Stack <stack@apache.org>
  • Loading branch information
bharathv committed Feb 2, 2021
1 parent 3f0829a commit f52b457
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public enum EventType {
* C_M_MERGE_REGION<br>
* Client asking Master to merge regions.
*/
C_M_MERGE_REGION (30, ExecutorType.MASTER_TABLE_OPERATIONS),
C_M_MERGE_REGION (30, ExecutorType.MASTER_MERGE_OPERATIONS),
/**
* Messages originating from Client to Master.<br>
* C_M_DELETE_TABLE<br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public enum ExecutorType {
MASTER_META_SERVER_OPERATIONS (6),
M_LOG_REPLAY_OPS (7),
MASTER_SNAPSHOT_OPERATIONS (8),
MASTER_MERGE_OPERATIONS (9),

// RegionServer executor services
RS_OPEN_REGION (20),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1397,6 +1397,13 @@ public static enum Modify {
"hbase.master.executor.serverops.threads";
public static final int MASTER_SERVER_OPERATIONS_THREADS_DEFAULT = 5;

/**
* Number of threads used to dispatch merge operations to the regionservers.
*/
public static final String MASTER_MERGE_DISPATCH_THREADS =
"hbase.master.executor.merge.dispatch.threads";
public static final int MASTER_MERGE_DISPATCH_THREADS_DEFAULT = 2;

public static final String MASTER_META_SERVER_OPERATIONS_THREADS =
"hbase.master.executor.meta.serverops.threads";
public static final int MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,9 @@ private void startServiceThreads() throws IOException {
HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
this.service.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt(
SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT));
this.service.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS, conf.getInt(
HConstants.MASTER_MERGE_DISPATCH_THREADS,
HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT));

// We depend on there being only one instance of this executor running
// at a time. To do concurrency, would need fencing of enable/disable of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ public List<NormalizationPlan> computePlanForTable(TableName table) throws HBase
LOG.debug("Table " + table + ", average region size: " + avgRegionSize);

int candidateIdx = 0;
int splitCount = 0;
int mergeCount = 0;
while (candidateIdx < tableRegions.size()) {
HRegionInfo hri = tableRegions.get(candidateIdx);
long regionSize = getRegionSize(hri);
Expand All @@ -193,6 +195,7 @@ public List<NormalizationPlan> computePlanForTable(TableName table) throws HBase
LOG.info("Table " + table + ", large region " + hri.getRegionNameAsString() + " has size "
+ regionSize + ", more than twice avg size, splitting");
plans.add(new SplitNormalizationPlan(hri, null));
splitCount++;
}
} else {
if (candidateIdx == tableRegions.size()-1) {
Expand All @@ -206,6 +209,7 @@ public List<NormalizationPlan> computePlanForTable(TableName table) throws HBase
+ " plus its neighbor size: " + regionSize2
+ ", less than the avg size " + avgRegionSize + ", merging them");
plans.add(new MergeNormalizationPlan(hri, hri2));
mergeCount++;
candidateIdx++;
}
}
Expand All @@ -217,6 +221,10 @@ public List<NormalizationPlan> computePlanForTable(TableName table) throws HBase
return null;
}
Collections.sort(plans, planComparator);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Computed normalization plans for table %s. Total plans %d, split " +
"plans %d, merge plans %d", table, plans.size(), splitCount, mergeCount));
}
return plans;
}

Expand Down

0 comments on commit f52b457

Please sign in to comment.