Skip to content

Commit

Permalink
HBASE-17320 Add inclusive/exclusive support for startRow and endRow o…
Browse files Browse the repository at this point in the history
…f scan
  • Loading branch information
Apache9 committed Dec 29, 2016
1 parent a3e0e0d commit 05b1d91
Show file tree
Hide file tree
Showing 30 changed files with 1,102 additions and 502 deletions.
Expand Up @@ -19,7 +19,7 @@


import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;


import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -64,10 +64,10 @@ class AsyncClientScanner {
public AsyncClientScanner(Scan scan, RawScanResultConsumer consumer, TableName tableName, public AsyncClientScanner(Scan scan, RawScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) { AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) {
if (scan.getStartRow() == null) { if (scan.getStartRow() == null) {
scan.setStartRow(EMPTY_START_ROW); scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
} }
if (scan.getStopRow() == null) { if (scan.getStopRow() == null) {
scan.setStopRow(EMPTY_END_ROW); scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow());
} }
this.scan = scan; this.scan = scan;
this.consumer = consumer; this.consumer = consumer;
Expand Down Expand Up @@ -117,23 +117,22 @@ private void startScan(OpenScannerResponse resp) {
conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub) conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub)
.setScan(scan).consumer(consumer).resultCache(resultCache) .setScan(scan).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start() .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start().whenComplete((hasMore, error) -> {
.whenComplete((locateType, error) -> {
if (error != null) { if (error != null) {
consumer.onError(error); consumer.onError(error);
return; return;
} }
if (locateType == null) { if (hasMore) {
consumer.onComplete(); openScanner();
} else { } else {
openScanner(locateType); consumer.onComplete();
} }
}); });
} }


private void openScanner(RegionLocateType locateType) { private void openScanner() {
conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow()) conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
.locateType(locateType).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call() .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call()
.whenComplete((resp, error) -> { .whenComplete((resp, error) -> {
if (error != null) { if (error != null) {
Expand All @@ -145,7 +144,6 @@ conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getS
} }


public void start() { public void start() {
openScanner(scan.isReversed() && isEmptyStartRow(scan.getStartRow()) ? RegionLocateType.BEFORE openScanner();
: RegionLocateType.CURRENT);
} }
} }
Expand Up @@ -246,7 +246,7 @@ public AsyncScanSingleRegionRpcRetryingCaller build() {
/** /**
* Short cut for {@code build().start()}. * Short cut for {@code build().start()}.
*/ */
public CompletableFuture<RegionLocateType> start() { public CompletableFuture<Boolean> start() {
return build().start(); return build().start();
} }
} }
Expand Down
Expand Up @@ -17,11 +17,9 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;


import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
Expand All @@ -34,7 +32,6 @@
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;


import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
Expand All @@ -53,7 +50,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;


/** /**
Expand Down Expand Up @@ -91,11 +87,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {


private final int startLogErrorsCnt; private final int startLogErrorsCnt;


private final Supplier<byte[]> createNextStartRowWhenError;

private final Runnable completeWhenNoMoreResultsInRegion; private final Runnable completeWhenNoMoreResultsInRegion;


private final CompletableFuture<RegionLocateType> future; private final CompletableFuture<Boolean> future;


private final HBaseRpcController controller; private final HBaseRpcController controller;


Expand Down Expand Up @@ -128,10 +122,8 @@ public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
this.rpcTimeoutNs = rpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt; this.startLogErrorsCnt = startLogErrorsCnt;
if (scan.isReversed()) { if (scan.isReversed()) {
createNextStartRowWhenError = this::createReversedNextStartRowWhenError;
completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion; completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
} else { } else {
createNextStartRowWhenError = this::createNextStartRowWhenError;
completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion; completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
} }
this.future = new CompletableFuture<>(); this.future = new CompletableFuture<>();
Expand Down Expand Up @@ -164,23 +156,13 @@ private void completeExceptionally(boolean closeScanner) {
future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
} }


@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
justification = "https://github.com/findbugsproject/findbugs/issues/79")
private void completeNoMoreResults() { private void completeNoMoreResults() {
future.complete(null); future.complete(false);
}

private void completeWithNextStartRow(byte[] nextStartRow) {
scan.setStartRow(nextStartRow);
future.complete(scan.isReversed() ? RegionLocateType.BEFORE : RegionLocateType.CURRENT);
} }


private byte[] createNextStartRowWhenError() { private void completeWithNextStartRow(byte[] row, boolean inclusive) {
return createClosestRowAfter(nextStartRowWhenError); scan.withStartRow(row, inclusive);
} future.complete(true);

private byte[] createReversedNextStartRowWhenError() {
return createClosestRowBefore(nextStartRowWhenError);
} }


private void completeWhenError(boolean closeScanner) { private void completeWhenError(boolean closeScanner) {
Expand All @@ -189,12 +171,9 @@ private void completeWhenError(boolean closeScanner) {
closeScanner(); closeScanner();
} }
if (nextStartRowWhenError != null) { if (nextStartRowWhenError != null) {
scan.setStartRow( scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError);
includeNextStartRowWhenError ? nextStartRowWhenError : createNextStartRowWhenError.get());
} }
future.complete( future.complete(true);
scan.isReversed() && Bytes.equals(scan.getStartRow(), loc.getRegionInfo().getEndKey())
? RegionLocateType.BEFORE : RegionLocateType.CURRENT);
} }


private void onError(Throwable error) { private void onError(Throwable error) {
Expand Down Expand Up @@ -251,29 +230,19 @@ private void updateNextStartRowWhenError(Result result) {
} }


private void completeWhenNoMoreResultsInRegion() { private void completeWhenNoMoreResultsInRegion() {
if (isEmptyStopRow(scan.getStopRow())) { if (noMoreResultsForScan(scan, loc.getRegionInfo())) {
if (isEmptyStopRow(loc.getRegionInfo().getEndKey())) { completeNoMoreResults();
completeNoMoreResults();
}
} else { } else {
if (Bytes.compareTo(loc.getRegionInfo().getEndKey(), scan.getStopRow()) >= 0) { completeWithNextStartRow(loc.getRegionInfo().getEndKey(), true);
completeNoMoreResults();
}
} }
completeWithNextStartRow(loc.getRegionInfo().getEndKey());
} }


private void completeReversedWhenNoMoreResultsInRegion() { private void completeReversedWhenNoMoreResultsInRegion() {
if (isEmptyStopRow(scan.getStopRow())) { if (noMoreResultsForReverseScan(scan, loc.getRegionInfo())) {
if (isEmptyStartRow(loc.getRegionInfo().getStartKey())) { completeNoMoreResults();
completeNoMoreResults();
}
} else { } else {
if (Bytes.compareTo(loc.getRegionInfo().getStartKey(), scan.getStopRow()) <= 0) { completeWithNextStartRow(loc.getRegionInfo().getStartKey(), false);
completeNoMoreResults();
}
} }
completeWithNextStartRow(loc.getRegionInfo().getStartKey());
} }


private void onComplete(ScanResponse resp) { private void onComplete(ScanResponse resp) {
Expand Down Expand Up @@ -343,9 +312,9 @@ private void next() {
} }


/** /**
* @return return locate direction for next open scanner call, or null if we should stop. * @return {@code true} if we should continue, otherwise {@code false}.
*/ */
public CompletableFuture<RegionLocateType> start() { public CompletableFuture<Boolean> start() {
next(); next();
return future; return future;
} }
Expand Down
Expand Up @@ -17,8 +17,9 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;


import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;


import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
Expand All @@ -37,7 +38,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.util.Bytes;


/** /**
* Retry caller for smaller scan. * Retry caller for smaller scan.
Expand All @@ -57,10 +57,6 @@ class AsyncSmallScanRpcRetryingCaller {


private final long rpcTimeoutNs; private final long rpcTimeoutNs;


private final Function<byte[], byte[]> createClosestNextRow;

private final Runnable firstScan;

private final Function<HRegionInfo, Boolean> nextScan; private final Function<HRegionInfo, Boolean> nextScan;


private final List<Result> resultList; private final List<Result> resultList;
Expand All @@ -76,12 +72,8 @@ public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName table
this.scanTimeoutNs = scanTimeoutNs; this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
if (scan.isReversed()) { if (scan.isReversed()) {
this.createClosestNextRow = ConnectionUtils::createClosestRowBefore;
this.firstScan = this::reversedFirstScan;
this.nextScan = this::reversedNextScan; this.nextScan = this::reversedNextScan;
} else { } else {
this.createClosestNextRow = ConnectionUtils::createClosestRowAfter;
this.firstScan = this::firstScan;
this.nextScan = this::nextScan; this.nextScan = this::nextScan;
} }
this.resultList = new ArrayList<>(); this.resultList = new ArrayList<>();
Expand Down Expand Up @@ -141,22 +133,21 @@ private void onComplete(SmallScanResponse resp) {
} }
if (resp.hasMoreResultsInRegion) { if (resp.hasMoreResultsInRegion) {
if (resp.results.length > 0) { if (resp.results.length > 0) {
scan.setStartRow( scan.withStartRow(resp.results[resp.results.length - 1].getRow(), false);
createClosestNextRow.apply(resp.results[resp.results.length - 1].getRow()));
} }
scan(RegionLocateType.CURRENT); scan();
return; return;
} }
if (!nextScan.apply(resp.currentRegion)) { if (!nextScan.apply(resp.currentRegion)) {
future.complete(resultList); future.complete(resultList);
} }
} }


private void scan(RegionLocateType locateType) { private void scan() {
conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow()) conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).locateType(locateType) .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::scan).call()
.action(this::scan).call().whenComplete((resp, error) -> { .whenComplete((resp, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);
} else { } else {
Expand All @@ -166,45 +157,27 @@ conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getSta
} }


public CompletableFuture<List<Result>> call() { public CompletableFuture<List<Result>> call() {
firstScan.run(); scan();
return future; return future;
} }


private void firstScan() { private boolean nextScan(HRegionInfo info) {
scan(RegionLocateType.CURRENT); if (noMoreResultsForScan(scan, info)) {
} return false;

private void reversedFirstScan() {
scan(isEmptyStartRow(scan.getStartRow()) ? RegionLocateType.BEFORE : RegionLocateType.CURRENT);
}

private boolean nextScan(HRegionInfo region) {
if (isEmptyStopRow(scan.getStopRow())) {
if (isEmptyStopRow(region.getEndKey())) {
return false;
}
} else { } else {
if (Bytes.compareTo(region.getEndKey(), scan.getStopRow()) >= 0) { scan.withStartRow(info.getEndKey());
return false; scan();
} return true;
} }
scan.setStartRow(region.getEndKey());
scan(RegionLocateType.CURRENT);
return true;
} }


private boolean reversedNextScan(HRegionInfo region) { private boolean reversedNextScan(HRegionInfo info) {
if (isEmptyStopRow(scan.getStopRow())) { if (noMoreResultsForReverseScan(scan, info)) {
if (isEmptyStartRow(region.getStartKey())) { return false;
return false;
}
} else { } else {
if (Bytes.compareTo(region.getStartKey(), scan.getStopRow()) <= 0) { scan.withStartRow(info.getStartKey(), false);
return false; scan();
} return true;
} }
scan.setStartRow(region.getStartKey());
scan(RegionLocateType.BEFORE);
return true;
} }
} }

0 comments on commit 05b1d91

Please sign in to comment.