Skip to content

Commit

Permalink
HBASE-13421 Reduce the number of object creations introduced by HBASE…
Browse files Browse the repository at this point in the history
…-11544 in scan RPC hot code paths

Signed-off-by: stack <stack@apache.org>
  • Loading branch information
Jonathan Lawlor authored and saintstack committed Apr 8, 2015
1 parent cbc53a0 commit 62d47e1
Show file tree
Hide file tree
Showing 56 changed files with 1,167 additions and 778 deletions.
Expand Up @@ -401,6 +401,9 @@ protected void loadCache() throws IOException {
// happens for the cases where we see exceptions. Since only openScanner // happens for the cases where we see exceptions. Since only openScanner
// would have happened, values would be null // would have happened, values would be null
if (values == null && callable.switchedToADifferentReplica()) { if (values == null && callable.switchedToADifferentReplica()) {
// Any accumulated partial results are no longer valid since the callable will
// openScanner with the correct startkey and we must pick up from there
clearPartialResults();
this.currentRegion = callable.getHRegionInfo(); this.currentRegion = callable.getHRegionInfo();
continue; continue;
} }
Expand Down
Expand Up @@ -292,21 +292,39 @@ private int addCallsForOtherReplicas(
continue; //this was already scheduled earlier continue; //this was already scheduled earlier
} }
ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id); ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);

setStartRowForReplicaCallable(s);
if (this.lastResult != null) {
if(s.getScan().isReversed()){
s.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
}else {
s.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
}
}
outstandingCallables.add(s); outstandingCallables.add(s);
RetryingRPC retryingOnReplica = new RetryingRPC(s); RetryingRPC retryingOnReplica = new RetryingRPC(s);
cs.submit(retryingOnReplica, scannerTimeout, id); cs.submit(retryingOnReplica, scannerTimeout, id);
} }
return max - min + 1; return max - min + 1;
} }


/**
* Set the start row for the replica callable based on the state of the last result received.
* @param callable The callable to set the start row on
*/
private void setStartRowForReplicaCallable(ScannerCallable callable) {
if (this.lastResult == null || callable == null) return;

if (this.lastResult.isPartial()) {
// The last result was a partial result which means we have not received all of the cells
// for this row. Thus, use the last result's row as the start row. If a replica switch
// occurs, the scanner will ensure that any accumulated partial results are cleared,
// and the scan can resume from this row.
callable.getScan().setStartRow(this.lastResult.getRow());
} else {
// The last result was not a partial result which means it contained all of the cells for
// that row (we no longer need any information from it). Set the start row to the next
// closest row that could be seen.
if (callable.getScan().isReversed()) {
callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
} else {
callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
}
}
}

@VisibleForTesting @VisibleForTesting
boolean isAnyRPCcancelled() { boolean isAnyRPCcancelled() {
return someRPCcancelled; return someRPCcancelled;
Expand Down
Expand Up @@ -46,7 +46,6 @@
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
Expand Down Expand Up @@ -137,7 +136,7 @@ public void delete(RpcController controller, BulkDeleteRequest request,
List<List<Cell>> deleteRows = new ArrayList<List<Cell>>(rowBatchSize); List<List<Cell>> deleteRows = new ArrayList<List<Cell>>(rowBatchSize);
for (int i = 0; i < rowBatchSize; i++) { for (int i = 0; i < rowBatchSize; i++) {
List<Cell> results = new ArrayList<Cell>(); List<Cell> results = new ArrayList<Cell>();
hasMore = NextState.hasMoreValues(scanner.next(results)); hasMore = scanner.next(results);
if (results.size() > 0) { if (results.size() > 0) {
deleteRows.add(results); deleteRows.add(results);
} }
Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;


import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcCallback;
Expand Down Expand Up @@ -81,7 +80,7 @@ public void getRowCount(RpcController controller, ExampleProtos.CountRequest req
byte[] lastRow = null; byte[] lastRow = null;
long count = 0; long count = 0;
do { do {
hasMore = NextState.hasMoreValues(scanner.next(results)); hasMore = scanner.next(results);
for (Cell kv : results) { for (Cell kv : results) {
byte[] currentRow = CellUtil.cloneRow(kv); byte[] currentRow = CellUtil.cloneRow(kv);
if (lastRow == null || !Bytes.equals(lastRow, currentRow)) { if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
Expand Down Expand Up @@ -120,7 +119,7 @@ public void getKeyValueCount(RpcController controller, ExampleProtos.CountReques
boolean hasMore = false; boolean hasMore = false;
long count = 0; long count = 0;
do { do {
hasMore = NextState.hasMoreValues(scanner.next(results)); hasMore = scanner.next(results);
for (Cell kv : results) { for (Cell kv : results) {
count++; count++;
} }
Expand Down
Expand Up @@ -22,16 +22,17 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;


import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.mortbay.log.Log; import org.mortbay.log.Log;


Expand Down Expand Up @@ -72,10 +73,7 @@ public ClientSideRegionScanner(Configuration conf, FileSystem fs,
public Result next() throws IOException { public Result next() throws IOException {
values.clear(); values.clear();


// negative values indicate no limits scanner.nextRaw(values, NoLimitScannerContext.getInstance());
final long remainingResultSize = -1;
final int batchLimit = -1;
scanner.nextRaw(values, batchLimit, remainingResultSize);
if (values.isEmpty()) { if (values.isEmpty()) {
//we are done //we are done
return null; return null;
Expand Down
Expand Up @@ -38,7 +38,6 @@
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;


import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.Message; import com.google.protobuf.Message;
Expand Down Expand Up @@ -92,7 +91,7 @@ public void getMax(RpcController controller, AggregateRequest request,
// qualifier can be null. // qualifier can be null.
boolean hasMoreRows = false; boolean hasMoreRows = false;
do { do {
hasMoreRows = NextState.hasMoreValues(scanner.next(results)); hasMoreRows = scanner.next(results);
int listSize = results.size(); int listSize = results.size();
for (int i = 0; i < listSize; i++) { for (int i = 0; i < listSize; i++) {
temp = ci.getValue(colFamily, qualifier, results.get(i)); temp = ci.getValue(colFamily, qualifier, results.get(i));
Expand Down Expand Up @@ -146,7 +145,7 @@ public void getMin(RpcController controller, AggregateRequest request,
} }
boolean hasMoreRows = false; boolean hasMoreRows = false;
do { do {
hasMoreRows = NextState.hasMoreValues(scanner.next(results)); hasMoreRows = scanner.next(results);
int listSize = results.size(); int listSize = results.size();
for (int i = 0; i < listSize; i++) { for (int i = 0; i < listSize; i++) {
temp = ci.getValue(colFamily, qualifier, results.get(i)); temp = ci.getValue(colFamily, qualifier, results.get(i));
Expand Down Expand Up @@ -200,7 +199,7 @@ public void getSum(RpcController controller, AggregateRequest request,
List<Cell> results = new ArrayList<Cell>(); List<Cell> results = new ArrayList<Cell>();
boolean hasMoreRows = false; boolean hasMoreRows = false;
do { do {
hasMoreRows = NextState.hasMoreValues(scanner.next(results)); hasMoreRows = scanner.next(results);
int listSize = results.size(); int listSize = results.size();
for (int i = 0; i < listSize; i++) { for (int i = 0; i < listSize; i++) {
temp = ci.getValue(colFamily, qualifier, results.get(i)); temp = ci.getValue(colFamily, qualifier, results.get(i));
Expand Down Expand Up @@ -254,7 +253,7 @@ public void getRowNum(RpcController controller, AggregateRequest request,
scanner = env.getRegion().getScanner(scan); scanner = env.getRegion().getScanner(scan);
boolean hasMoreRows = false; boolean hasMoreRows = false;
do { do {
hasMoreRows = NextState.hasMoreValues(scanner.next(results)); hasMoreRows = scanner.next(results);
if (results.size() > 0) { if (results.size() > 0) {
counter++; counter++;
} }
Expand Down Expand Up @@ -313,7 +312,7 @@ public void getAvg(RpcController controller, AggregateRequest request,


do { do {
results.clear(); results.clear();
hasMoreRows = NextState.hasMoreValues(scanner.next(results)); hasMoreRows = scanner.next(results);
int listSize = results.size(); int listSize = results.size();
for (int i = 0; i < listSize; i++) { for (int i = 0; i < listSize; i++) {
sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily, sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
Expand Down Expand Up @@ -374,7 +373,7 @@ public void getStd(RpcController controller, AggregateRequest request,


do { do {
tempVal = null; tempVal = null;
hasMoreRows = NextState.hasMoreValues(scanner.next(results)); hasMoreRows = scanner.next(results);
int listSize = results.size(); int listSize = results.size();
for (int i = 0; i < listSize; i++) { for (int i = 0; i < listSize; i++) {
tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily, tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
Expand Down Expand Up @@ -441,7 +440,7 @@ public void getMedian(RpcController controller, AggregateRequest request,
do { do {
tempVal = null; tempVal = null;
tempWeight = null; tempWeight = null;
hasMoreRows = NextState.hasMoreValues(scanner.next(results)); hasMoreRows = scanner.next(results);
int listSize = results.size(); int listSize = results.size();
for (int i = 0; i < listSize; i++) { for (int i = 0; i < listSize; i++) {
Cell kv = results.get(i); Cell kv = results.get(i);
Expand Down

0 comments on commit 62d47e1

Please sign in to comment.