Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27355 Separate meta read requests from master and client #4801

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
Expand Down Expand Up @@ -183,8 +184,7 @@ public static HRegionLocation getRegionLocation(Connection connection, byte[] re
} catch (Exception parseEx) {
// Ignore. This is used with tableName passed as regionName.
}
Get get = new Get(row);
get.addFamily(HConstants.CATALOG_FAMILY);
Get get = buildInternalCatalogFamilyGet(row);
Result r;
try (Table t = getMetaHTable(connection)) {
r = t.get(get);
Expand All @@ -210,8 +210,7 @@ public static HRegionLocation getRegionLocation(Connection connection, RegionInf
/** Returns Return the {@link HConstants#CATALOG_FAMILY} row from hbase:meta. */
public static Result getCatalogFamilyRow(Connection connection, RegionInfo ri)
throws IOException {
Get get = new Get(CatalogFamilyFormat.getMetaKeyForRegion(ri));
get.addFamily(HConstants.CATALOG_FAMILY);
Get get = buildInternalCatalogFamilyGet(CatalogFamilyFormat.getMetaKeyForRegion(ri));
try (Table t = getMetaHTable(connection)) {
return t.get(get);
}
Expand All @@ -225,13 +224,17 @@ public static Result getCatalogFamilyRow(Connection connection, RegionInfo ri)
*/
public static Result getRegionResult(Connection connection, byte[] regionName)
throws IOException {
Get get = new Get(regionName);
get.addFamily(HConstants.CATALOG_FAMILY);
Get get = buildInternalCatalogFamilyGet(regionName);
try (Table t = getMetaHTable(connection)) {
return t.get(get);
}
}

private static Get buildInternalCatalogFamilyGet(byte[] row) {
return new Get(row).addFamily(HConstants.CATALOG_FAMILY)
.setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS);
}

/**
* Scans META table for a row whose key contains the specified <B>regionEncodedName</B>, returning
* a single related <code>Result</code> instance if any row is found, null otherwise.
Expand Down Expand Up @@ -340,6 +343,7 @@ private static Scan getMetaScan(Configuration conf, int rowUpperLimit) {
scan.setReadType(Scan.ReadType.PREAD);
}
scan.setCaching(scannerCaching);
scan.setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS);
return scan;
}

Expand Down Expand Up @@ -387,7 +391,7 @@ public static void fullScanMetaAndPrint(Connection connection) throws IOExceptio
LOG.info("fullScanMetaAndPrint.Current Meta Row: " + r);
TableState state = CatalogFamilyFormat.getTableState(r);
if (state != null) {
LOG.info("fullScanMetaAndPrint.Table State={}" + state);
LOG.info("fullScanMetaAndPrint.Table State={}", state);
} else {
RegionLocations locations = CatalogFamilyFormat.getRegionLocations(r);
if (locations == null) {
Expand All @@ -409,6 +413,14 @@ public static void scanMetaForTableRegions(Connection connection,
scanMeta(connection, tableName, QueryType.REGION, Integer.MAX_VALUE, visitor);
}

public static void scanMetaForTableRegions(Connection connection,
ClientMetaTableAccessor.Visitor visitor, TableName tableName, int priority) throws IOException {
scanMeta(connection,
ClientMetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION),
ClientMetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION), QueryType.REGION,
null, Integer.MAX_VALUE, visitor, priority);
}

private static void scanMeta(Connection connection, TableName table, QueryType type, int maxRows,
final ClientMetaTableAccessor.Visitor visitor) throws IOException {
scanMeta(connection, ClientMetaTableAccessor.getTableStartRowForMeta(table, type),
Expand Down Expand Up @@ -463,6 +475,23 @@ public static void scanMeta(Connection connection, @Nullable final byte[] startR
public static void scanMeta(Connection connection, @Nullable final byte[] startRow,
@Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows,
final ClientMetaTableAccessor.Visitor visitor) throws IOException {
scanMeta(connection, startRow, stopRow, type, filter, maxRows, visitor,
RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS);
}

/**
* Performs a scan of META table.
* @param connection connection we're using
* @param startRow Where to start the scan. Pass null if want to begin scan at first row.
* @param stopRow Where to stop the scan. Pass null if want to scan all rows from the start one
* @param type scanned part of meta
* @param maxRows maximum rows to return
* @param visitor Visitor invoked against each row.
* @param priority priority assigned to the meta read request
*/
public static void scanMeta(Connection connection, @Nullable final byte[] startRow,
@Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows,
final ClientMetaTableAccessor.Visitor visitor, final int priority) throws IOException {
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
Scan scan = getMetaScan(connection.getConfiguration(), rowUpperLimit);

Expand All @@ -479,10 +508,14 @@ public static void scanMeta(Connection connection, @Nullable final byte[] startR
scan.setFilter(filter);
}

scan.setPriority(priority);

if (LOG.isTraceEnabled()) {
LOG.trace("Scanning META" + " starting at row=" + Bytes.toStringBinary(startRow)
+ " stopping at row=" + Bytes.toStringBinary(stopRow) + " for max=" + rowUpperLimit
+ " with caching=" + scan.getCaching());
LOG.trace(
"Scanning META starting at row={} stopping at row={} for max={} with caching={} "
+ "priority={}",
Bytes.toStringBinary(startRow), Bytes.toStringBinary(stopRow), rowUpperLimit,
scan.getCaching(), priority);
}

int currentRow = 0;
Expand Down Expand Up @@ -584,8 +617,9 @@ public static TableState getTableState(Connection conn, TableName tableName) thr
return new TableState(tableName, TableState.State.ENABLED);
}
Table metaHTable = getMetaHTable(conn);
Get get = new Get(tableName.getName()).addColumn(HConstants.TABLE_FAMILY,
HConstants.TABLE_STATE_QUALIFIER);
Get get = new Get(tableName.getName())
.addColumn(HConstants.TABLE_FAMILY, HConstants.TABLE_STATE_QUALIFIER)
.setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS);
Result result = metaHTable.get(get);
return CatalogFamilyFormat.getTableState(result);
}
Expand Down Expand Up @@ -881,7 +915,7 @@ private static void updateLocation(Connection connection, RegionInfo regionInfo,
addRegionInfo(put, regionInfo);
addLocation(put, sn, openSeqNum, regionInfo.getReplicaId());
putToMetaTable(connection, put);
LOG.info("Updated row {} with server=", regionInfo.getRegionNameAsString(), sn);
LOG.info("Updated row {} with server={}", regionInfo.getRegionNameAsString(), sn);
}

public static Put addRegionInfo(final Put p, final RegionInfo hri) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand All @@ -32,7 +33,10 @@ public class MetaRWQueueRpcExecutor extends RWQueueRpcExecutor {
"hbase.ipc.server.metacallqueue.read.ratio";
public static final String META_CALL_QUEUE_SCAN_SHARE_CONF_KEY =
"hbase.ipc.server.metacallqueue.scan.ratio";
public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.9f;
public static final String META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
"hbase.ipc.server.metacallqueue.handler.factor";
public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.8f;
private static final float DEFAULT_META_CALL_QUEUE_SCAN_SHARE = 0.2f;

public MetaRWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
Expand All @@ -46,6 +50,23 @@ protected float getReadShare(final Configuration conf) {

@Override
protected float getScanShare(final Configuration conf) {
return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, DEFAULT_META_CALL_QUEUE_SCAN_SHARE);
}

@Override
public boolean dispatch(CallRunner callTask) {
RpcCall call = callTask.getRpcCall();
int level = call.getHeader().getPriority();
final boolean toWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
// dispatch client system read request to read handlers
// dispatch internal system read request to scan handlers
final boolean toScanQueue =
getNumScanQueues() > 0 && level == RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS;
return dispatchTo(toWriteQueue, toScanQueue, callTask);
}

@Override
protected float getCallQueueHandlerFactor(Configuration conf) {
return conf.getFloat(META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5f);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,8 @@ private void propagateBalancerConfigChange(QueueBalancer balancer, Configuration
((ConfigurationObserver) balancer).onConfigurationChange(conf);
}
}

protected int getNumScanQueues() {
return numScanQueues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
this.conf = conf;
this.abortable = abortable;

float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
float callQueuesHandlersFactor = getCallQueueHandlerFactor(conf);
if (
Float.compare(callQueuesHandlersFactor, 1.0f) > 0
|| Float.compare(0.0f, callQueuesHandlersFactor) > 0
Expand Down Expand Up @@ -468,4 +468,8 @@ public void onConfigurationChange(Configuration conf) {
}
}
}

protected float getCallQueueHandlerFactor(Configuration conf) {
return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ && cleanParent(e.getKey(), e.getValue())
protected CatalogJanitorReport scanForReport() throws IOException {
ReportMakingVisitor visitor = new ReportMakingVisitor(this.services);
// Null tablename means scan all of meta.
MetaTableAccessor.scanMetaForTableRegions(this.services.getConnection(), visitor, null);
MetaTableAccessor.scanMetaForTableRegions(this.services.getConnection(), visitor, null,
HConstants.SYSTEMTABLE_QOS);
return visitor.getReport();
}

Expand Down Expand Up @@ -491,7 +492,8 @@ public static void main(String[] args) throws IOException {
r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
t.put(p);
}
MetaTableAccessor.scanMetaForTableRegions(connection, visitor, null);
MetaTableAccessor.scanMetaForTableRegions(connection, visitor, null,
HConstants.SYSTEMTABLE_QOS);
CatalogJanitorReport report = visitor.getReport();
LOG.info(report != null ? report.toString() : "empty");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@
* Priority function specifically for the region server.
*/
@InterfaceAudience.Private
class RSAnnotationReadingPriorityFunction extends AnnotationReadingPriorityFunction<RSRpcServices> {
public class RSAnnotationReadingPriorityFunction
extends AnnotationReadingPriorityFunction<RSRpcServices> {

private static final Logger LOG =
LoggerFactory.getLogger(RSAnnotationReadingPriorityFunction.class);

/** Used to control the scan delay, currently sqrt(numNextCall * weight) */
public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";

public static final int INTERNAL_READ_QOS = 250;

@SuppressWarnings("unchecked")
private final Class<? extends Message>[] knownArgumentClasses =
new Class[] { GetRegionInfoRequest.class, GetStoreFileRequest.class, CloseRegionRequest.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
import org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -108,7 +109,7 @@ public void testBasic() throws IOException, InterruptedException {
RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0);
scheduler.init(CONTEXT);
scheduler.start();
CallRunner task = createMockTask();
CallRunner task = createMockTask(HConstants.NORMAL_QOS);
task.setStatus(new MonitoredRPCHandlerImpl());
scheduler.dispatch(task);
verify(task, timeout(10000)).run();
Expand Down Expand Up @@ -163,7 +164,7 @@ public void testCallQueueInfo() throws IOException, InterruptedException {

int totalCallMethods = 10;
for (int i = totalCallMethods; i > 0; i--) {
CallRunner task = createMockTask();
CallRunner task = createMockTask(HConstants.NORMAL_QOS);
task.setStatus(new MonitoredRPCHandlerImpl());
scheduler.dispatch(task);
}
Expand All @@ -185,9 +186,9 @@ public void testCallQueueInfo() throws IOException, InterruptedException {

@Test
public void testHandlerIsolation() throws IOException, InterruptedException {
CallRunner generalTask = createMockTask();
CallRunner priorityTask = createMockTask();
CallRunner replicationTask = createMockTask();
CallRunner generalTask = createMockTask(HConstants.NORMAL_QOS);
CallRunner priorityTask = createMockTask(HConstants.HIGH_QOS + 1);
CallRunner replicationTask = createMockTask(HConstants.REPLICATION_QOS);
List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, replicationTask);
Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, priorityTask,
HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS);
Expand Down Expand Up @@ -227,10 +228,12 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
}

private CallRunner createMockTask() {
private CallRunner createMockTask(int priority) {
ServerCall call = mock(ServerCall.class);
CallRunner task = mock(CallRunner.class);
RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
when(task.getRpcCall()).thenReturn(call);
when(call.getHeader()).thenReturn(header);
return task;
}

Expand Down Expand Up @@ -707,7 +710,7 @@ public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Except
@Test
public void testMetaRWScanQueues() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);

Expand All @@ -728,36 +731,37 @@ public void testMetaRWScanQueues() throws Exception {
when(putCall.getHeader()).thenReturn(putHead);
when(putCall.getParam()).thenReturn(putCall.param);

CallRunner getCallTask = mock(CallRunner.class);
ServerCall getCall = mock(ServerCall.class);
RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
when(getCallTask.getRpcCall()).thenReturn(getCall);
when(getCall.getHeader()).thenReturn(getHead);

CallRunner scanCallTask = mock(CallRunner.class);
ServerCall scanCall = mock(ServerCall.class);
scanCall.param = ScanRequest.newBuilder().build();
RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
when(scanCallTask.getRpcCall()).thenReturn(scanCall);
when(scanCall.getHeader()).thenReturn(scanHead);
when(scanCall.getParam()).thenReturn(scanCall.param);
CallRunner clientReadCallTask = mock(CallRunner.class);
ServerCall clientReadCall = mock(ServerCall.class);
RequestHeader clientReadHead = RequestHeader.newBuilder().setMethodName("get").build();
when(clientReadCallTask.getRpcCall()).thenReturn(clientReadCall);
when(clientReadCall.getHeader()).thenReturn(clientReadHead);

CallRunner internalReadCallTask = mock(CallRunner.class);
ServerCall internalReadCall = mock(ServerCall.class);
internalReadCall.param = ScanRequest.newBuilder().build();
RequestHeader masterReadHead = RequestHeader.newBuilder().setMethodName("get")
.setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS).build();
when(internalReadCallTask.getRpcCall()).thenReturn(internalReadCall);
when(internalReadCall.getHeader()).thenReturn(masterReadHead);
when(internalReadCall.getParam()).thenReturn(internalReadCall.param);

ArrayList<Integer> work = new ArrayList<>();
doAnswerTaskExecution(putCallTask, work, 1, 1000);
doAnswerTaskExecution(getCallTask, work, 2, 1000);
doAnswerTaskExecution(scanCallTask, work, 3, 1000);
doAnswerTaskExecution(clientReadCallTask, work, 2, 1000);
doAnswerTaskExecution(internalReadCallTask, work, 3, 1000);

// There are 3 queues: [puts], [gets], [scans]
// so the calls will be interleaved
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(clientReadCallTask);
scheduler.dispatch(clientReadCallTask);
scheduler.dispatch(clientReadCallTask);
scheduler.dispatch(internalReadCallTask);
scheduler.dispatch(internalReadCallTask);
scheduler.dispatch(internalReadCallTask);

while (work.size() < 6) {
Thread.sleep(100);
Expand Down