Skip to content

Commit

Permalink
HBASE-11544: [Ergonomics] hbase.client.scanner.caching is dogged and …
Browse files Browse the repository at this point in the history
…will try to return batch even if it means OOME

Signed-off-by: stack <stack@apache.org>
  • Loading branch information
Jonathan Lawlor authored and saintstack committed Mar 5, 2015
1 parent 39425a8 commit de9791e
Show file tree
Hide file tree
Showing 65 changed files with 2,725 additions and 539 deletions.

Large diffs are not rendered by default.

100 changes: 92 additions & 8 deletions hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
Expand Up @@ -19,6 +19,7 @@

package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -33,6 +34,7 @@
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
Expand Down Expand Up @@ -80,6 +82,17 @@ public class Result implements CellScannable, CellScanner {
private Cell[] cells;
private Boolean exists; // if the query was just to check existence.
private boolean stale = false;

/**
* Partial results do not contain the full row's worth of cells. The result had to be returned in
* parts because the size of the cells in the row exceeded the RPC result size on the server.
* Partial results must be combined client side with results representing the remainder of the
* row's cells to form the complete result. Partial results and RPC result size allow us to avoid
* OOME on the server when servicing requests for large rows. The Scan configuration used to
* control the result size on the server is {@link Scan#setMaxResultSize(long)} and the default
* value can be seen here: {@link HConstants#DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE}
*/
private boolean partial = false;
// We're not using java serialization. Transient here is just a marker to say
// that this is where we cache row if we're ever asked for it.
private transient byte [] row = null;
Expand Down Expand Up @@ -115,18 +128,22 @@ public Result() {
* @param cells List of cells
*/
public static Result create(List<Cell> cells) {
return new Result(cells.toArray(new Cell[cells.size()]), null, false);
return create(cells, null);
}

public static Result create(List<Cell> cells, Boolean exists) {
return create(cells, exists, false);
}

public static Result create(List<Cell> cells, Boolean exists, boolean stale) {
return create(cells, exists, stale, false);
}

public static Result create(List<Cell> cells, Boolean exists, boolean stale, boolean partial) {
if (exists != null){
return new Result(null, exists, stale);
return new Result(null, exists, stale, partial);
}
return new Result(cells.toArray(new Cell[cells.size()]), null, stale);
return new Result(cells.toArray(new Cell[cells.size()]), null, stale, partial);
}

/**
Expand All @@ -135,21 +152,26 @@ public static Result create(List<Cell> cells, Boolean exists, boolean stale) {
* @param cells array of cells
*/
public static Result create(Cell[] cells) {
return new Result(cells, null, false);
return create(cells, null, false);
}

public static Result create(Cell[] cells, Boolean exists, boolean stale) {
return create(cells, exists, stale, false);
}

public static Result create(Cell[] cells, Boolean exists, boolean stale, boolean partial) {
if (exists != null){
return new Result(null, exists, stale);
return new Result(null, exists, stale, partial);
}
return new Result(cells, null, stale);
return new Result(cells, null, stale, partial);
}

/** Private ctor. Use {@link #create(Cell[])}. */
private Result(Cell[] cells, Boolean exists, boolean stale) {
private Result(Cell[] cells, Boolean exists, boolean stale, boolean partial) {
this.cells = cells;
this.exists = exists;
this.stale = stale;
this.partial = partial;
}

/**
Expand Down Expand Up @@ -746,7 +768,59 @@ public static void compareResults(Result res1, Result res2)
}

/**
* Get total size of raw cells
* Forms a single result from the partial results in the partialResults list. This method is
* useful for reconstructing partial results on the client side.
* @param partialResults list of partial results
* @return The complete result that is formed by combining all of the partial results together
* @throws IOException A complete result cannot be formed because the results in the partial list
* come from different rows
*/
public static Result createCompleteResult(List<Result> partialResults)
throws IOException {
List<Cell> cells = new ArrayList<Cell>();
boolean stale = false;
byte[] prevRow = null;
byte[] currentRow = null;

if (partialResults != null && !partialResults.isEmpty()) {
for (int i = 0; i < partialResults.size(); i++) {
Result r = partialResults.get(i);
currentRow = r.getRow();
if (prevRow != null && !Bytes.equals(prevRow, currentRow)) {
throw new IOException(
"Cannot form complete result. Rows of partial results do not match." +
" Partial Results: " + partialResults);
}

// Ensure that all Results except the last one are marked as partials. The last result
// may not be marked as a partial because Results are only marked as partials when
// the scan on the server side must be stopped due to reaching the maxResultSize.
// Visualizing it makes it easier to understand:
// maxResultSize: 2 cells
// (-x-) represents cell number x in a row
// Example: row1: -1- -2- -3- -4- -5- (5 cells total)
// How row1 will be returned by the server as partial Results:
// Result1: -1- -2- (2 cells, size limit reached, mark as partial)
// Result2: -3- -4- (2 cells, size limit reached, mark as partial)
// Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
if (i != (partialResults.size() - 1) && !r.isPartial()) {
throw new IOException(
"Cannot form complete result. Result is missing partial flag. " +
"Partial Results: " + partialResults);
}
prevRow = currentRow;
stale = stale || r.isStale();
for (Cell c : r.rawCells()) {
cells.add(c);
}
}
}

return Result.create(cells, null, stale);
}

/**
* Get total size of raw cells
* @param result
* @return Total size.
*/
Expand Down Expand Up @@ -804,6 +878,16 @@ public boolean isStale() {
return stale;
}

/**
* Whether or not the result is a partial result. Partial results contain a subset of the cells
* for a row and should be combined with a result representing the remaining cells in that row to
* form a complete (non-partial) result.
* @return Whether or not the result is a partial result
*/
public boolean isPartial() {
return partial;
}

/**
* Add load information about the region to the information about the result
* @param loadStats statistics about the current region from which this was returned
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;

import org.apache.commons.logging.Log;
Expand All @@ -38,9 +37,6 @@
@InterfaceAudience.Private
public class ReversedClientScanner extends ClientScanner {
private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class);
// A byte array in which all elements are the max byte, and it is used to
// construct closest front row
static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);

/**
* Create a new ReversibleClientScanner for the specified table Note that the
Expand Down Expand Up @@ -139,9 +135,10 @@ protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey,
new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
locateStartRow, this.rpcControllerFactory);
s.setCaching(nbRows);
ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(getTable(), getConnection(),
s, pool, primaryOperationTimeout, scan,
getRetries(), getScannerTimeout(), caching, getConf(), caller);
ScannerCallableWithReplicas sr =
new ScannerCallableWithReplicas(getTable(), getConnection(), s, pool,
primaryOperationTimeout, scan, getRetries(), getScannerTimeout(), caching, getConf(),
caller);
return sr;
}

Expand All @@ -161,26 +158,4 @@ protected boolean checkScanStopRow(final byte[] startKey) {
}
return false; // unlikely.
}

/**
* Create the closest row before the specified row
* @param row
* @return a new byte array which is the closest front row of the specified one
*/
protected static byte[] createClosestRowBefore(byte[] row) {
if (row == null) {
throw new IllegalArgumentException("The passed row is empty");
}
if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
return MAX_BYTE_ARRAY;
}
if (row[row.length - 1] == 0) {
return Arrays.copyOf(row, row.length - 1);
} else {
byte[] closestFrontRow = Arrays.copyOf(row, row.length);
closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
return closestFrontRow;
}
}
}
Expand Up @@ -98,6 +98,19 @@ public class Scan extends Query {
private int maxVersions = 1;
private int batch = -1;

/**
* Partial {@link Result}s are {@link Result}s must be combined to form a complete {@link Result}.
* The {@link Result}s had to be returned in fragments (i.e. as partials) because the size of the
* cells in the row exceeded max result size on the server. Typically partial results will be
* combined client side into complete results before being delivered to the caller. However, if
* this flag is set, the caller is indicating that they do not mind seeing partial results (i.e.
* they understand that the results returned from the Scanner may only represent part of a
* particular row). In such a case, any attempt to combine the partials into a complete result on
* the client side will be skipped, and the caller will be able to see the exact results returned
* from the server.
*/
private boolean allowPartialResults = false;

private int storeLimit = -1;
private int storeOffset = 0;
private boolean getScan;
Expand Down Expand Up @@ -674,6 +687,27 @@ public boolean isReversed() {
return reversed;
}

/**
* Setting whether the caller wants to see the partial results that may be returned from the
* server. By default this value is false and the complete results will be assembled client side
* before being delivered to the caller.
* @param allowPartialResults
* @return this
*/
public Scan setAllowPartialResults(final boolean allowPartialResults) {
this.allowPartialResults = allowPartialResults;
return this;
}

/**
* @return true when the constructor of this scan understands that the results they will see may
* only represent a partial portion of a row. The entire row would be retrieved by
* subsequent calls to {@link ResultScanner#next()}
*/
public boolean getAllowPartialResults() {
return allowPartialResults;
}

/**
* Set the value indicating whether loading CFs on demand should be allowed (cluster
* default is false). On-demand CF loading doesn't load column families until necessary, e.g.
Expand Down
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
Expand All @@ -44,8 +46,6 @@

import com.google.common.annotations.VisibleForTesting;

import static org.apache.hadoop.hbase.client.ReversedClientScanner.createClosestRowBefore;

/**
* This class has the logic for handling scanners for regions with and without replicas.
* 1. A scan is attempted on the default (primary) region
Expand Down
Expand Up @@ -37,7 +37,6 @@
import java.util.NavigableSet;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -53,6 +52,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Delete;
Expand All @@ -68,7 +68,6 @@
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
Expand Down Expand Up @@ -1285,6 +1284,7 @@ public static ClientProtos.Result toResult(final Result result) {
}

builder.setStale(result.isStale());
builder.setPartial(result.isPartial());

return builder.build();
}
Expand Down Expand Up @@ -1343,7 +1343,7 @@ public static Result toResult(final ClientProtos.Result proto) {
for (CellProtos.Cell c : values) {
cells.add(toCell(c));
}
return Result.create(cells, null, proto.getStale());
return Result.create(cells, null, proto.getStale(), proto.getPartial());
}

/**
Expand Down
Expand Up @@ -486,6 +486,7 @@ public static ScanRequest buildScanRequest(final byte[] regionName,
builder.setCloseScanner(closeScanner);
builder.setRegion(region);
builder.setScan(ProtobufUtil.toScan(scan));
builder.setClientHandlesPartials(true);
return builder.build();
}

Expand All @@ -503,6 +504,7 @@ public static ScanRequest buildScanRequest(final long scannerId,
builder.setNumberOfRows(numberOfRows);
builder.setCloseScanner(closeScanner);
builder.setScannerId(scannerId);
builder.setClientHandlesPartials(true);
return builder.build();
}

Expand All @@ -522,6 +524,7 @@ public static ScanRequest buildScanRequest(final long scannerId, final int numbe
builder.setCloseScanner(closeScanner);
builder.setScannerId(scannerId);
builder.setNextCallSeq(nextCallSeq);
builder.setClientHandlesPartials(true);
return builder.build();
}

Expand Down
Expand Up @@ -339,6 +339,9 @@ public static Result[] getResults(CellScanner cellScanner, ScanResponse response
// Cells are out in cellblocks. Group them up again as Results. How many to read at a
// time will be found in getCellsLength -- length here is how many Cells in the i'th Result
int noOfCells = response.getCellsPerResult(i);
boolean isPartial =
response.getPartialFlagPerResultCount() > i ?
response.getPartialFlagPerResult(i) : false;
List<Cell> cells = new ArrayList<Cell>(noOfCells);
for (int j = 0; j < noOfCells; j++) {
try {
Expand All @@ -361,7 +364,7 @@ public static Result[] getResults(CellScanner cellScanner, ScanResponse response
}
cells.add(cellScanner.current());
}
results[i] = Result.create(cells, null, response.getStale());
results[i] = Result.create(cells, null, response.getStale(), isPartial);
} else {
// Result is pure pb.
results[i] = ProtobufUtil.toResult(response.getResults(i));
Expand Down
Expand Up @@ -698,7 +698,7 @@ public static enum Modify {
/**
* Default value for {@link #HBASE_CLIENT_SCANNER_CACHING}
*/
public static final int DEFAULT_HBASE_CLIENT_SCANNER_CACHING = 100;
public static final int DEFAULT_HBASE_CLIENT_SCANNER_CACHING = Integer.MAX_VALUE;

/**
* Parameter name for number of rows that will be fetched when calling next on
Expand Down

0 comments on commit de9791e

Please sign in to comment.