Skip to content

Commit

Permalink
HBASE-13335 Use serverHasMoreResults context in SmallScanner and Smal…
Browse files Browse the repository at this point in the history
…lReversedScanner.

Use the context passed back via ScanResponse that a RegionServer
fills in to denote whether or not more results existing in the
current Region. Add a simple factory to remove a static method
used across both SmallScanner and SmallReverseScanner. Add new
unit tests for both scanner classes to test scans with and
without the new context (as a quick backward-compatibility test).
  • Loading branch information
joshelser authored and enis committed Apr 1, 2015
1 parent 35ddea7 commit ffdcc00
Show file tree
Hide file tree
Showing 4 changed files with 943 additions and 128 deletions.
Expand Up @@ -31,9 +31,13 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.common.annotations.VisibleForTesting;

/**
* Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
* scan results, unless the results cross multiple regions or the row count of
Expand All @@ -46,33 +50,83 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
private ScannerCallableWithReplicas smallScanCallable = null;
private byte[] skipRowOfFirstResult = null;
private SmallScannerCallableFactory callableFactory;

/**
* Create a new ReversibleClientScanner for the specified table Note that the
* passed {@link Scan}'s start row maybe changed changed.
* Create a new ReversibleClientScanner for the specified table. Take note that the passed
* {@link Scan} 's start row maybe changed changed.
*
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to rangeGet
* @param connection Connection identifying the cluster
* @param conf
* The {@link Configuration} to use.
* @param scan
* {@link Scan} to use in this scanner
* @param tableName
* The table that we wish to rangeGet
* @param connection
* Connection identifying the cluster
* @param rpcFactory
* Factory used to create the {@link RpcRetryingCaller}
* @param controllerFactory
* Factory used to access RPC payloads
* @param pool
* Threadpool for RPC threads
* @param primaryOperationTimeout
* Call timeout
* @throws IOException
* If the remote call fails
*/
public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
final TableName tableName, ClusterConnection connection,
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory,
ExecutorService pool, int primaryOperationTimeout) throws IOException {
final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout, new SmallScannerCallableFactory());
}

/**
* Create a new ReversibleClientScanner for the specified table. Take note that the passed
* {@link Scan}'s start row may be changed.
*
* @param conf
* The {@link Configuration} to use.
* @param scan
* {@link Scan} to use in this scanner
* @param tableName
* The table that we wish to rangeGet
* @param connection
* Connection identifying the cluster
* @param rpcFactory
* Factory used to create the {@link RpcRetryingCaller}
* @param controllerFactory
* Factory used to access RPC payloads
* @param pool
* Threadpool for RPC threads
* @param primaryOperationTimeout
* Call timeout
* @param callableFactory
* Factory used to create the {@link SmallScannerCallable}
* @throws IOException
* If the remote call fails
*/
@VisibleForTesting
ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
SmallScannerCallableFactory callableFactory) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout);
this.callableFactory = callableFactory;
}

/**
* Gets a scanner for following scan. Move to next region or continue from the
* last result or start from the start row.
* Gets a scanner for following scan. Move to next region or continue from the last result or
* start from the start row.
*
* @param nbRows
* @param done true if Server-side says we're done scanning.
* @param currentRegionDone true if scan is over on current region
* @param done
* true if Server-side says we're done scanning.
* @param currentRegionDone
* true if scan is over on current region
* @return true if has next scanner
* @throws IOException
*/
Expand Down Expand Up @@ -112,10 +166,9 @@ private boolean nextScanner(int nbRows, final boolean done,
+ Bytes.toStringBinary(localStartKey) + "'");
}

smallScanCallable = ClientSmallScanner.getSmallScanCallable(
getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum,
rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
getRetries(), getScannerTimeout(), getConf(), caller);
smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);

if (this.scanMetrics != null && skipRowOfFirstResult == null) {
this.scanMetrics.countOfRegions.incrementAndGet();
Expand All @@ -131,46 +184,7 @@ public Result next() throws IOException {
return null;
}
if (cache.size() == 0) {
Result[] values = null;
long remainingResultSize = maxScannerResultSize;
int countdown = this.caching;
boolean currentRegionDone = false;
// Values == null means server-side filter has determined we must STOP
while (remainingResultSize > 0 && countdown > 0
&& nextScanner(countdown, values == null, currentRegionDone)) {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
// TODO use context from server
values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
this.currentRegion = smallScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
- lastNext);
}
lastNext = currentTime;
if (values != null && values.length > 0) {
for (int i = 0; i < values.length; i++) {
Result rs = values[i];
if (i == 0 && this.skipRowOfFirstResult != null
&& Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
// Skip the first result
continue;
}
cache.add(rs);
// We don't make Iterator here
for (Cell cell : rs.rawCells()) {
remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
}
countdown--;
this.lastResult = rs;
}
}
currentRegionDone = countdown > 0;
}
loadCache();
}

if (cache.size() > 0) {
Expand All @@ -182,6 +196,52 @@ && nextScanner(countdown, values == null, currentRegionDone)) {
return null;
}

@Override
protected void loadCache() throws IOException {
Result[] values = null;
long remainingResultSize = maxScannerResultSize;
int countdown = this.caching;
boolean currentRegionDone = false;
// Values == null means server-side filter has determined we must STOP
while (remainingResultSize > 0 && countdown > 0
&& nextScanner(countdown, values == null, currentRegionDone)) {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
this.currentRegion = smallScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
- lastNext);
}
lastNext = currentTime;
if (values != null && values.length > 0) {
for (int i = 0; i < values.length; i++) {
Result rs = values[i];
if (i == 0 && this.skipRowOfFirstResult != null
&& Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
// Skip the first result
continue;
}
cache.add(rs);
// We don't make Iterator here
for (Cell cell : rs.rawCells()) {
remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
}
countdown--;
this.lastResult = rs;
}
}
if (smallScanCallable.hasMoreResultsContext()) {
currentRegionDone = !smallScanCallable.getServerHasMoreResults();
} else {
currentRegionDone = countdown > 0;
}
}
}

@Override
protected void initializeScannerInConstruction() throws IOException {
Expand All @@ -195,4 +255,8 @@ public void close() {
closed = true;
}

@VisibleForTesting
protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
this.callableFactory = callableFactory;
}
}

0 comments on commit ffdcc00

Please sign in to comment.