Skip to content

Commit

Permalink
HBASE-17334 Add locate row before/after support for AsyncRegionLocator
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Dec 22, 2016
1 parent 6678186 commit 09bb428
Show file tree
Hide file tree
Showing 14 changed files with 236 additions and 236 deletions.
Expand Up @@ -118,23 +118,22 @@ private void startScan(OpenScannerResponse resp) {
.setScan(scan).consumer(consumer).resultCache(resultCache) .setScan(scan).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start() .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start()
.whenComplete((locateToPreviousRegion, error) -> { .whenComplete((locateType, error) -> {
if (error != null) { if (error != null) {
consumer.onError(error); consumer.onError(error);
return; return;
} }
if (locateToPreviousRegion == null) { if (locateType == null) {
consumer.onComplete(); consumer.onComplete();
} else { } else {
openScanner(locateToPreviousRegion.booleanValue()); openScanner(locateType);
} }
}); });
} }


private void openScanner(boolean locateToPreviousRegion) { private void openScanner(RegionLocateType locateType) {
conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow()) conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
.locateToPreviousRegion(locateToPreviousRegion) .locateType(locateType).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call() .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call()
.whenComplete((resp, error) -> { .whenComplete((resp, error) -> {
if (error != null) { if (error != null) {
Expand All @@ -146,6 +145,7 @@ conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getS
} }


public void start() { public void start() {
openScanner(scan.isReversed() && isEmptyStartRow(scan.getStartRow())); openScanner(scan.isReversed() && isEmptyStartRow(scan.getStartRow()) ? RegionLocateType.BEFORE
: RegionLocateType.CURRENT);
} }
} }
Expand Up @@ -374,7 +374,8 @@ private void groupAndSend(Stream<Get> gets, int tries) {
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
ConcurrentLinkedQueue<Get> locateFailed = new ConcurrentLinkedQueue<>(); ConcurrentLinkedQueue<Get> locateFailed = new ConcurrentLinkedQueue<>();
CompletableFuture.allOf(gets.map(get -> conn.getLocator() CompletableFuture.allOf(gets.map(get -> conn.getLocator()
.getRegionLocation(tableName, get.getRow(), locateTimeoutNs).whenComplete((loc, error) -> { .getRegionLocation(tableName, get.getRow(), RegionLocateType.CURRENT, locateTimeoutNs)
.whenComplete((loc, error) -> {
if (error != null) { if (error != null) {
error = translateException(error); error = translateException(error);
if (error instanceof DoNotRetryIOException) { if (error instanceof DoNotRetryIOException) {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import static org.apache.hadoop.hbase.HRegionInfo.createRegionName; import static org.apache.hadoop.hbase.HRegionInfo.createRegionName;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.client.AsyncRegionLocator.updateCachedLoation; import static org.apache.hadoop.hbase.client.AsyncRegionLocator.updateCachedLoation;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
Expand Down Expand Up @@ -77,16 +78,16 @@ private static final class LocateRequest {


public final byte[] row; public final byte[] row;


public final boolean locateToPrevious; public final RegionLocateType locateType;


public LocateRequest(byte[] row, boolean locateToPrevious) { public LocateRequest(byte[] row, RegionLocateType locateType) {
this.row = row; this.row = row;
this.locateToPrevious = locateToPrevious; this.locateType = locateType;
} }


@Override @Override
public int hashCode() { public int hashCode() {
return Bytes.hashCode(row) ^ Boolean.hashCode(locateToPrevious); return Bytes.hashCode(row) ^ locateType.hashCode();
} }


@Override @Override
Expand All @@ -95,7 +96,7 @@ public boolean equals(Object obj) {
return false; return false;
} }
LocateRequest that = (LocateRequest) obj; LocateRequest that = (LocateRequest) obj;
return locateToPrevious == that.locateToPrevious && Bytes.equals(row, that.row); return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
} }
} }


Expand Down Expand Up @@ -192,8 +193,14 @@ private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation
return true; return true;
} }
boolean completed; boolean completed;
if (req.locateToPrevious) { if (req.locateType.equals(RegionLocateType.BEFORE)) {
completed = Bytes.equals(loc.getRegionInfo().getEndKey(), req.row); // for locating the row before current row, the common case is to find the previous region in
// reverse scan, so we check the endKey first. In general, the condition should be startKey <
// req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row
// && startKey < req.row). The two conditions are equal since startKey < endKey.
int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row);
completed =
c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0);
} else { } else {
completed = loc.getRegionInfo().containsRow(req.row); completed = loc.getRegionInfo().containsRow(req.row);
} }
Expand All @@ -206,11 +213,11 @@ private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation
} }


private void complete(TableName tableName, LocateRequest req, HRegionLocation loc, private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
Throwable error, String rowNameInErrorMsg) { Throwable error) {
if (error != null) { if (error != null) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Failed to locate region in '" + tableName + "', " + rowNameInErrorMsg + "='" LOG.debug("Failed to locate region in '" + tableName + "', row='"
+ Bytes.toStringBinary(req.row) + "'", + Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType,
error); error);
} }
} }
Expand Down Expand Up @@ -254,87 +261,67 @@ private void complete(TableName tableName, LocateRequest req, HRegionLocation lo
} }
} }
if (toSend != null) { if (toSend != null) {
if (toSend.locateToPrevious) { locateInMeta(tableName, toSend);
locatePreviousInMeta(tableName, toSend);
} else {
locateInMeta(tableName, toSend);
}
} }
} }


private void onScanComplete(TableName tableName, LocateRequest req, List<Result> results, private void onScanComplete(TableName tableName, LocateRequest req, List<Result> results,
Throwable error, String rowNameInErrorMsg) { Throwable error) {
if (error != null) { if (error != null) {
complete(tableName, req, null, error, rowNameInErrorMsg); complete(tableName, req, null, error);
return; return;
} }
if (results.isEmpty()) { if (results.isEmpty()) {
complete(tableName, req, null, new TableNotFoundException(tableName), rowNameInErrorMsg); complete(tableName, req, null, new TableNotFoundException(tableName));
return; return;
} }
RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0)); RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='" LOG.debug("The fetched location of '" + tableName + "', row='" + Bytes.toStringBinary(req.row)
+ Bytes.toStringBinary(req.row) + "' is " + locs); + "', locateType=" + req.locateType + " is " + locs);
} }
if (locs == null || locs.getDefaultRegionLocation() == null) { if (locs == null || locs.getDefaultRegionLocation() == null) {
complete(tableName, req, null, complete(tableName, req, null,
new IOException(String.format("No location found for '%s', %s='%s'", tableName, new IOException(String.format("No location found for '%s', row='%s', locateType=%s",
rowNameInErrorMsg, Bytes.toStringBinary(req.row))), tableName, Bytes.toStringBinary(req.row), req.locateType)));
rowNameInErrorMsg);
return; return;
} }
HRegionLocation loc = locs.getDefaultRegionLocation(); HRegionLocation loc = locs.getDefaultRegionLocation();
HRegionInfo info = loc.getRegionInfo(); HRegionInfo info = loc.getRegionInfo();
if (info == null) { if (info == null) {
complete(tableName, req, null, complete(tableName, req, null,
new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName, new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
rowNameInErrorMsg, Bytes.toStringBinary(req.row))), tableName, Bytes.toStringBinary(req.row), req.locateType)));
rowNameInErrorMsg);
return; return;
} }
if (!info.getTable().equals(tableName)) { if (!info.getTable().equals(tableName)) {
complete(tableName, req, null, complete(tableName, req, null, new TableNotFoundException(
new TableNotFoundException( "Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"));
"Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"),
rowNameInErrorMsg);
return; return;
} }
if (info.isSplit()) { if (info.isSplit()) {
complete(tableName, req, null, complete(tableName, req, null,
new RegionOfflineException( new RegionOfflineException(
"the only available region for the required row is a split parent," "the only available region for the required row is a split parent,"
+ " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"), + " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"));
rowNameInErrorMsg);
return; return;
} }
if (info.isOffline()) { if (info.isOffline()) {
complete(tableName, req, null, complete(tableName, req, null, new RegionOfflineException("the region is offline, could"
new RegionOfflineException("the region is offline, could" + " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"));
+ " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"),
rowNameInErrorMsg);
return; return;
} }
if (loc.getServerName() == null) { if (loc.getServerName() == null) {
complete(tableName, req, null, complete(tableName, req, null,
new NoServerForRegionException( new NoServerForRegionException(
String.format("No server address listed for region '%s', %s='%s'", String.format("No server address listed for region '%s', row='%s', locateType=%s",
info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(req.row))), info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType)));
rowNameInErrorMsg);
return;
}
if (req.locateToPrevious && !Bytes.equals(info.getEndKey(), req.row)) {
complete(tableName, req, null,
new DoNotRetryIOException("The end key of '" + info.getRegionNameAsString() + "' is '"
+ Bytes.toStringBinary(info.getEndKey()) + "', expected '"
+ Bytes.toStringBinary(req.row) + "'"),
rowNameInErrorMsg);
return; return;
} }
complete(tableName, req, loc, null, rowNameInErrorMsg); complete(tableName, req, loc, null);
} }


private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row) { private HRegionLocation locateRowInCache(TableCache tableCache, TableName tableName, byte[] row) {
Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row); Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row);
if (entry == null) { if (entry == null) {
return null; return null;
Expand All @@ -344,30 +331,27 @@ private HRegionLocation locateInCache(TableCache tableCache, TableName tableName
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
+ Bytes.toStringBinary(row) + "'"); + Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT);
} }
return loc; return loc;
} else { } else {
return null; return null;
} }
} }


private HRegionLocation locatePreviousInCache(TableCache tableCache, TableName tableName, private HRegionLocation locateRowBeforeInCache(TableCache tableCache, TableName tableName,
byte[] startRowOfCurrentRegion) { byte[] row) {
Map.Entry<byte[], HRegionLocation> entry; Map.Entry<byte[], HRegionLocation> entry =
if (isEmptyStopRow(startRowOfCurrentRegion)) { isEmptyStopRow(row) ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
entry = tableCache.cache.lastEntry();
} else {
entry = tableCache.cache.lowerEntry(startRowOfCurrentRegion);
}
if (entry == null) { if (entry == null) {
return null; return null;
} }
HRegionLocation loc = entry.getValue(); HRegionLocation loc = entry.getValue();
if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) { if (isEmptyStopRow(loc.getRegionInfo().getEndKey())
|| Bytes.compareTo(loc.getRegionInfo().getEndKey(), row) >= 0) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='" LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
+ Bytes.toStringBinary(startRowOfCurrentRegion) + "'"); + Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE);
} }
return loc; return loc;
} else { } else {
Expand All @@ -377,46 +361,41 @@ private HRegionLocation locatePreviousInCache(TableCache tableCache, TableName t


private void locateInMeta(TableName tableName, LocateRequest req) { private void locateInMeta(TableName tableName, LocateRequest req) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace( LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row)
"Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) + "' in meta"); + "', locateType=" + req.locateType + " in meta");
}
byte[] metaKey = createRegionName(tableName, req.row, NINES, false);
conn.getRawTable(META_TABLE_NAME)
.smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
.whenComplete((results, error) -> onScanComplete(tableName, req, results, error, "row"));
}

private void locatePreviousInMeta(TableName tableName, LocateRequest req) {
if (LOG.isTraceEnabled()) {
LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='"
+ Bytes.toStringBinary(req.row) + "' in meta");
} }
byte[] metaKey; byte[] metaKey;
if (isEmptyStopRow(req.row)) { if (req.locateType.equals(RegionLocateType.BEFORE)) {
byte[] binaryTableName = tableName.getName(); if (isEmptyStopRow(req.row)) {
metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1); byte[] binaryTableName = tableName.getName();
metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
} else {
metaKey = createRegionName(tableName, req.row, ZEROES, false);
}
} else { } else {
metaKey = createRegionName(tableName, req.row, ZEROES, false); metaKey = createRegionName(tableName, req.row, NINES, false);
} }
conn.getRawTable(META_TABLE_NAME) conn.getRawTable(META_TABLE_NAME)
.smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1) .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
.whenComplete((results, error) -> onScanComplete(tableName, req, results, error, .whenComplete((results, error) -> onScanComplete(tableName, req, results, error));
"startRowOfCurrentRegion"));
} }


private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row, private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row,
boolean locateToPrevious) { RegionLocateType locateType) {
return locateToPrevious ? locatePreviousInCache(tableCache, tableName, row) return locateType.equals(RegionLocateType.BEFORE)
: locateInCache(tableCache, tableName, row); ? locateRowBeforeInCache(tableCache, tableName, row)
: locateRowInCache(tableCache, tableName, row);
} }


// locateToPrevious is true means we will use the start key of a region to locate the region // locateToPrevious is true means we will use the start key of a region to locate the region
// placed before it. Used for reverse scan. See the comment of // placed before it. Used for reverse scan. See the comment of
// AsyncRegionLocator.getPreviousRegionLocation. // AsyncRegionLocator.getPreviousRegionLocation.
private CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, private CompletableFuture<HRegionLocation> getRegionLocationInternal(TableName tableName,
boolean locateToPrevious) { byte[] row, RegionLocateType locateType) {
// AFTER should be convert to CURRENT before calling this method
assert !locateType.equals(RegionLocateType.AFTER);
TableCache tableCache = getTableCache(tableName); TableCache tableCache = getTableCache(tableName);
HRegionLocation loc = locateInCache(tableCache, tableName, row, locateToPrevious); HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
if (loc != null) { if (loc != null) {
return CompletableFuture.completedFuture(loc); return CompletableFuture.completedFuture(loc);
} }
Expand All @@ -425,11 +404,11 @@ private CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName
boolean sendRequest = false; boolean sendRequest = false;
synchronized (tableCache) { synchronized (tableCache) {
// check again // check again
loc = locateInCache(tableCache, tableName, row, locateToPrevious); loc = locateInCache(tableCache, tableName, row, locateType);
if (loc != null) { if (loc != null) {
return CompletableFuture.completedFuture(loc); return CompletableFuture.completedFuture(loc);
} }
req = new LocateRequest(row, locateToPrevious); req = new LocateRequest(row, locateType);
future = tableCache.allRequests.get(req); future = tableCache.allRequests.get(req);
if (future == null) { if (future == null) {
future = new CompletableFuture<>(); future = new CompletableFuture<>();
Expand All @@ -441,25 +420,23 @@ private CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName
} }
} }
if (sendRequest) { if (sendRequest) {
if (locateToPrevious) { locateInMeta(tableName, req);
locatePreviousInMeta(tableName, req);
} else {
locateInMeta(tableName, req);
}
} }
return future; return future;
} }


CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) { CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
return getRegionLocation(tableName, row, false); RegionLocateType locateType) {
} if (locateType.equals(RegionLocateType.BEFORE)) {

return getRegionLocationInternal(tableName, row, locateType);
// Used for reverse scan. See the comment of AsyncRegionLocator.getPreviousRegionLocation. } else {
// TODO: need to deal with region merge where the startRowOfCurrentRegion will not be the endRow // as we know the exact row after us, so we can just create the new row, and use the same
// of a region. // algorithm to locate it.
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName, if (locateType.equals(RegionLocateType.AFTER)) {
byte[] startRowOfCurrentRegion) { row = createClosestRowAfter(row);
return getRegionLocation(tableName, startRowOfCurrentRegion, true); }
return getRegionLocationInternal(tableName, row, RegionLocateType.CURRENT);
}
} }


void updateCachedLocation(HRegionLocation loc, Throwable exception) { void updateCachedLocation(HRegionLocation loc, Throwable exception) {
Expand Down

0 comments on commit 09bb428

Please sign in to comment.