Skip to content

Commit

Permalink
HBASE-25116 RegionMonitor support RegionTask count normalize
Browse files Browse the repository at this point in the history
  • Loading branch information
niuyulin committed Oct 12, 2020
1 parent 7731856 commit ffd024d
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,11 @@ public enum OperationStatusCode {
public static final String HBASE_CANARY_READ_RAW_SCAN_KEY = "hbase.canary.read.raw.enabled";

public static final String HBASE_CANARY_READ_ALL_CF = "hbase.canary.read.all.column.famliy";

public static final String HBASE_CANARY_REGION_MONITOR_TASK_COUNT_MAX = "hbase.canary.region.monitor.task.count.max";

public static final String HBASE_CANARY_REGION_MONITOR_TASK_COUNT_MIN = "hbase.canary.region.monitor.task.count.min";

/**
* Configuration keys for programmatic JAAS configuration for secured ZK interaction
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ public ConcurrentMap<String, List<RegionTaskResult>> getRegionMap() {
}

public int getTotalExpectedRegions() {
return this.regionMap.size();
return this.regionMap.values().stream().mapToInt(List::size).sum();
}
}

Expand Down Expand Up @@ -1390,6 +1390,8 @@ private static class RegionMonitor extends Monitor {
private int checkPeriod;
private boolean rawScanEnabled;
private boolean readAllCF;
private int maxTaskCount;
private int minTaskCount;

/**
* This is a timeout per table. If read of each region in the table aggregated takes longer
Expand Down Expand Up @@ -1422,6 +1424,10 @@ public RegionMonitor(Connection connection, String[] monitorTargets, boolean use
this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts);
this.configuredWriteTableTimeout = configuredWriteTableTimeout;
this.readAllCF = conf.getBoolean(HConstants.HBASE_CANARY_READ_ALL_CF, true);
this.minTaskCount = conf.getInt(HConstants.HBASE_CANARY_REGION_MONITOR_TASK_COUNT_MIN, 1000);
this.minTaskCount = this.minTaskCount > 0 ? this.minTaskCount : 1000;
this.maxTaskCount = conf.getInt(HConstants.HBASE_CANARY_REGION_MONITOR_TASK_COUNT_MAX, 10000);
this.maxTaskCount = this.maxTaskCount > 0 ? this.maxTaskCount : 10000;
}

private RegionStdOutSink getSink() {
Expand All @@ -1435,7 +1441,7 @@ private RegionStdOutSink getSink() {
public void run() {
if (this.initAdmin()) {
try {
List<Future<Void>> taskFutures = new LinkedList<>();
List<RegionTask> readTasks = new LinkedList<>();
RegionStdOutSink regionSink = this.getSink();
regionSink.resetFailuresCountDetails();
if (this.targets != null && this.targets.length > 0) {
Expand All @@ -1452,14 +1458,16 @@ public void run() {
this.initialized = true;
for (String table : tables) {
LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ,
readTasks.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ,
this.rawScanEnabled, readLatency, readAllCF));
}
} else {
taskFutures.addAll(sniff(TaskType.READ, regionSink));
readTasks.addAll(sniff(TaskType.READ, regionSink));
}

List<Future<Void>> taskFutures =
new LinkedList<>(executor.invokeAll(normalizeTaskCount(readTasks)));
if (writeSniffing) {
List<RegionTask> writeTasks = new LinkedList<>();
if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
try {
checkWriteTableDistribution();
Expand All @@ -1471,9 +1479,10 @@ public void run() {
// sniff canary table with write operation
regionSink.initializeWriteLatency();
LongAdder writeTableLatency = regionSink.getWriteLatency();
taskFutures
.addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName),
executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF));
writeTasks.addAll(normalizeTaskCount(
CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName), executor,
TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF)));
taskFutures.addAll(executor.invokeAll(writeTasks));
}

for (Future<Void> future : taskFutures) {
Expand Down Expand Up @@ -1522,6 +1531,39 @@ public void run() {
this.done = true;
}


/**
* If tasks count larger than "hbase.canary.region.monitor.task.count.max", we will randomly
* trim tasks for each table, according to the raito of the table region count in whole tasks
* region count. If tasks count less than "hbase.canary.region.monitor.task.count.min", we will
* repeat fill tasks
* @return taskList after normalized
*/
private List<RegionTask> normalizeTaskCount(List<RegionTask> taskList) {
if (taskList.size() > maxTaskCount) {
List<RegionTask> limitedCountTasks = new ArrayList();
double ratio = 1.0 * maxTaskCount / taskList.size();
Map<TableName, List<RegionTask>> table2RegionTaskMap = new HashMap();
taskList.forEach(task -> table2RegionTaskMap
.computeIfAbsent(task.region.getTable(), k -> new ArrayList<>()).add(task));

table2RegionTaskMap.values().forEach(regionTasks -> {
int count = (int) (ratio * regionTasks.size());
//at least one region task for each table
count = count > 0 ? count : 1;
Collections.shuffle(regionTasks);
limitedCountTasks.addAll(regionTasks.subList(0, count));
});
return limitedCountTasks;
} else if (taskList.size() < minTaskCount && taskList.size() > 0) {
List<RegionTask> expandTaskList = new ArrayList<>();
while (expandTaskList.size() < minTaskCount) {
expandTaskList.addAll(taskList);
}
return expandTaskList;
}
return taskList;
}
/**
* @return List of tables to use in test.
*/
Expand Down Expand Up @@ -1569,20 +1611,20 @@ private String[] generateMonitorTables(String[] monitorTargets) throws IOExcepti
/*
* Canary entry point to monitor all the tables.
*/
private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink)
private List<RegionTask> sniff(TaskType taskType, RegionStdOutSink regionSink)
throws Exception {
LOG.debug("Reading list of tables");
List<Future<Void>> taskFutures = new LinkedList<>();
List<RegionTask> tasks = new LinkedList<>();
for (TableDescriptor td: admin.listTableDescriptors()) {
if (admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) &&
(!td.getTableName().equals(writeTableName))) {
LongAdder readLatency =
regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString());
taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType,
tasks.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType,
this.rawScanEnabled, readLatency, readAllCF));
}
}
return taskFutures;
return tasks;
}

private void checkWriteTableDistribution() throws IOException {
Expand Down Expand Up @@ -1644,7 +1686,7 @@ private void createWriteTable(int numberOfServers) throws IOException {
* Canary entry point for specified table.
* @throws Exception exception
*/
private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
private static List<RegionTask> sniff(final Admin admin, final Sink sink, String tableName,
ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency,
boolean readAllCF) throws Exception {
LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName);
Expand All @@ -1660,7 +1702,7 @@ private static List<Future<Void>> sniff(final Admin admin, final Sink sink, Stri
/*
* Loops over regions of this table, and outputs information about the state.
*/
private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
private static List<RegionTask> sniff(final Admin admin, final Sink sink,
TableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
boolean rawScanEnabled, LongAdder rwLatency, boolean readAllCF) throws Exception {
LOG.debug("Reading list of regions for table {}", tableDesc.getTableName());
Expand All @@ -1680,7 +1722,7 @@ private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap();
regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>());
}
return executor.invokeAll(tasks);
return tasks;
}
} catch (TableNotFoundException e) {
return Collections.EMPTY_LIST;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -153,12 +155,8 @@ public void testCanaryRegionTaskReadAllCF() throws Exception {
CanaryTool canary = new CanaryTool(executor, sink);
configuration.setBoolean(HConstants.HBASE_CANARY_READ_ALL_CF, readAllCF);
assertEquals(0, ToolRunner.run(configuration, canary, args));
// the test table has two column family. If readAllCF set true,
// we expect read count is double of region count
int expectedReadCount =
readAllCF ? 2 * sink.getTotalExpectedRegions() : sink.getTotalExpectedRegions();
assertEquals("canary region success count should equal total expected read count",
expectedReadCount, sink.getReadSuccessCount());
sink.getTotalExpectedRegions(), sink.getReadSuccessCount());
Map<String, List<CanaryTool.RegionTaskResult>> regionMap = sink.getRegionMap();
assertFalse("verify region map has size > 0", regionMap.isEmpty());

Expand Down Expand Up @@ -386,4 +384,60 @@ private void testZookeeperCanaryWithArgs(String[] args) throws Exception {
.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
verify(sink, atLeastOnce()).publishReadTiming(eq(baseZnode), eq(hostPort), anyLong());
}

@Test
public void testLimitedTaskCount() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
testingUtility.createMultiRegionTable(tableName, FAMILY, 100);
int[] taskCounts = { 10, 50, 100 };
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
CanaryTool.RegionStdOutSink sink = spy(new CanaryTool.RegionStdOutSink());
CanaryTool canary = new CanaryTool(executor, sink);
String[] args = { "-writeSniffing", "-t", "10000", name.getMethodName() };
testingUtility.getConfiguration().setInt(HConstants.HBASE_CANARY_REGION_MONITOR_TASK_COUNT_MIN,
10);
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args));
verify(sink, atLeast(100)).publishReadTiming(isA(ServerName.class), isA(RegionInfo.class),
isA(ColumnFamilyDescriptor.class), anyLong());

for (int taskCount : taskCounts) {
sink = spy(new CanaryTool.RegionStdOutSink());
canary = new CanaryTool(executor, sink);
testingUtility.getConfiguration()
.setInt(HConstants.HBASE_CANARY_REGION_MONITOR_TASK_COUNT_MAX, taskCount);
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, null));
verify(sink, atMost(taskCount)).publishReadTiming(isA(ServerName.class),
isA(RegionInfo.class), isA(ColumnFamilyDescriptor.class), anyLong());
verify(sink, atMost(taskCount)).publishWriteTiming(isA(ServerName.class),
isA(RegionInfo.class), isA(ColumnFamilyDescriptor.class), anyLong());
}
}

@Test
public void testExpandedTaskCount() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
testingUtility.createMultiRegionTable(tableName, FAMILY, 100);
int[] taskCounts = { 200, 500, 1000 };
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
CanaryTool.RegionStdOutSink sink = spy(new CanaryTool.RegionStdOutSink());
CanaryTool canary = new CanaryTool(executor, sink);
String[] args = { "-writeSniffing", "-t", "10000", name.getMethodName() };
testingUtility.getConfiguration().setInt(HConstants.HBASE_CANARY_REGION_MONITOR_TASK_COUNT_MAX,
10000);
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args));
verify(sink, atLeast(100)).publishReadTiming(isA(ServerName.class), isA(RegionInfo.class),
isA(ColumnFamilyDescriptor.class), anyLong());

for (int taskCount : taskCounts) {
sink = spy(new CanaryTool.RegionStdOutSink());
canary = new CanaryTool(executor, sink);
testingUtility.getConfiguration()
.setInt(HConstants.HBASE_CANARY_REGION_MONITOR_TASK_COUNT_MIN, taskCount);
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, null));
verify(sink, atLeast(taskCount)).publishReadTiming(isA(ServerName.class),
isA(RegionInfo.class), isA(ColumnFamilyDescriptor.class), anyLong());
verify(sink, atLeast(taskCount)).publishWriteTiming(isA(ServerName.class),
isA(RegionInfo.class), isA(ColumnFamilyDescriptor.class), anyLong());
}
}
}

0 comments on commit ffd024d

Please sign in to comment.