diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java index dcf16774fa82..ac0837ed6f46 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java @@ -31,9 +31,13 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable; +import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.annotations.VisibleForTesting; + /** * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the * scan results, unless the results cross multiple regions or the row count of @@ -46,33 +50,83 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class); private ScannerCallableWithReplicas smallScanCallable = null; private byte[] skipRowOfFirstResult = null; + private SmallScannerCallableFactory callableFactory; /** - * Create a new ReversibleClientScanner for the specified table Note that the - * passed {@link Scan}'s start row maybe changed changed. + * Create a new ReversibleClientScanner for the specified table. Take note that the passed + * {@link Scan} 's start row maybe changed changed. * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to rangeGet - * @param connection Connection identifying the cluster + * @param conf + * The {@link Configuration} to use. + * @param scan + * {@link Scan} to use in this scanner + * @param tableName + * The table that we wish to rangeGet + * @param connection + * Connection identifying the cluster * @param rpcFactory + * Factory used to create the {@link RpcRetryingCaller} + * @param controllerFactory + * Factory used to access RPC payloads + * @param pool + * Threadpool for RPC threads + * @param primaryOperationTimeout + * Call timeout * @throws IOException + * If the remote call fails */ public ClientSmallReversedScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection, - RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, - ExecutorService pool, int primaryOperationTimeout) throws IOException { + final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) + throws IOException { + this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, + primaryOperationTimeout, new SmallScannerCallableFactory()); + } + + /** + * Create a new ReversibleClientScanner for the specified table. Take note that the passed + * {@link Scan}'s start row may be changed. + * + * @param conf + * The {@link Configuration} to use. + * @param scan + * {@link Scan} to use in this scanner + * @param tableName + * The table that we wish to rangeGet + * @param connection + * Connection identifying the cluster + * @param rpcFactory + * Factory used to create the {@link RpcRetryingCaller} + * @param controllerFactory + * Factory used to access RPC payloads + * @param pool + * Threadpool for RPC threads + * @param primaryOperationTimeout + * Call timeout + * @param callableFactory + * Factory used to create the {@link SmallScannerCallable} + * @throws IOException + * If the remote call fails + */ + @VisibleForTesting + ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName, + ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, + SmallScannerCallableFactory callableFactory) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, primaryOperationTimeout); + this.callableFactory = callableFactory; } /** - * Gets a scanner for following scan. Move to next region or continue from the - * last result or start from the start row. + * Gets a scanner for following scan. Move to next region or continue from the last result or + * start from the start row. * * @param nbRows - * @param done true if Server-side says we're done scanning. - * @param currentRegionDone true if scan is over on current region + * @param done + * true if Server-side says we're done scanning. + * @param currentRegionDone + * true if scan is over on current region * @return true if has next scanner * @throws IOException */ @@ -112,10 +166,9 @@ private boolean nextScanner(int nbRows, final boolean done, + Bytes.toStringBinary(localStartKey) + "'"); } - smallScanCallable = ClientSmallScanner.getSmallScanCallable( - getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum, - rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), - getRetries(), getScannerTimeout(), getConf(), caller); + smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan, + getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), + getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller); if (this.scanMetrics != null && skipRowOfFirstResult == null) { this.scanMetrics.countOfRegions.incrementAndGet(); @@ -131,46 +184,7 @@ public Result next() throws IOException { return null; } if (cache.size() == 0) { - Result[] values = null; - long remainingResultSize = maxScannerResultSize; - int countdown = this.caching; - boolean currentRegionDone = false; - // Values == null means server-side filter has determined we must STOP - while (remainingResultSize > 0 && countdown > 0 - && nextScanner(countdown, values == null, currentRegionDone)) { - // Server returns a null values if scanning is to stop. Else, - // returns an empty array if scanning is to go on and we've just - // exhausted current region. - // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, - // we do a callWithRetries - // TODO use context from server - values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout); - this.currentRegion = smallScanCallable.getHRegionInfo(); - long currentTime = System.currentTimeMillis(); - if (this.scanMetrics != null) { - this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - - lastNext); - } - lastNext = currentTime; - if (values != null && values.length > 0) { - for (int i = 0; i < values.length; i++) { - Result rs = values[i]; - if (i == 0 && this.skipRowOfFirstResult != null - && Bytes.equals(skipRowOfFirstResult, rs.getRow())) { - // Skip the first result - continue; - } - cache.add(rs); - // We don't make Iterator here - for (Cell cell : rs.rawCells()) { - remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); - } - countdown--; - this.lastResult = rs; - } - } - currentRegionDone = countdown > 0; - } + loadCache(); } if (cache.size() > 0) { @@ -182,6 +196,52 @@ && nextScanner(countdown, values == null, currentRegionDone)) { return null; } + @Override + protected void loadCache() throws IOException { + Result[] values = null; + long remainingResultSize = maxScannerResultSize; + int countdown = this.caching; + boolean currentRegionDone = false; + // Values == null means server-side filter has determined we must STOP + while (remainingResultSize > 0 && countdown > 0 + && nextScanner(countdown, values == null, currentRegionDone)) { + // Server returns a null values if scanning is to stop. Else, + // returns an empty array if scanning is to go on and we've just + // exhausted current region. + // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, + // we do a callWithRetries + values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout); + this.currentRegion = smallScanCallable.getHRegionInfo(); + long currentTime = System.currentTimeMillis(); + if (this.scanMetrics != null) { + this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime + - lastNext); + } + lastNext = currentTime; + if (values != null && values.length > 0) { + for (int i = 0; i < values.length; i++) { + Result rs = values[i]; + if (i == 0 && this.skipRowOfFirstResult != null + && Bytes.equals(skipRowOfFirstResult, rs.getRow())) { + // Skip the first result + continue; + } + cache.add(rs); + // We don't make Iterator here + for (Cell cell : rs.rawCells()) { + remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); + } + countdown--; + this.lastResult = rs; + } + } + if (smallScanCallable.hasMoreResultsContext()) { + currentRegionDone = !smallScanCallable.getServerHasMoreResults(); + } else { + currentRegionDone = countdown > 0; + } + } + } @Override protected void initializeScannerInConstruction() throws IOException { @@ -195,4 +255,8 @@ public void close() { closed = true; } + @VisibleForTesting + protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) { + this.callableFactory = callableFactory; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index 84762839561f..1c6be16c5fad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ServiceException; /** @@ -55,26 +56,72 @@ public class ClientSmallScanner extends ClientScanner { // When fetching results from server, skip the first result if it has the same // row with this one private byte[] skipRowOfFirstResult = null; + private SmallScannerCallableFactory callableFactory; /** - * Create a new ShortClientScanner for the specified table Note that the - * passed {@link Scan}'s start row maybe changed changed. + * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan} + * 's start row maybe changed changed. * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to rangeGet - * @param connection Connection identifying the cluster + * @param conf + * The {@link Configuration} to use. + * @param scan + * {@link Scan} to use in this scanner + * @param tableName + * The table that we wish to rangeGet + * @param connection + * Connection identifying the cluster * @param rpcFactory + * Factory used to create the {@link RpcRetryingCaller} + * @param controllerFactory + * Factory used to access RPC payloads * @param pool + * Threadpool for RPC threads * @param primaryOperationTimeout + * Call timeout * @throws IOException + * If the remote call fails */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection, - RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, - ExecutorService pool, int primaryOperationTimeout) throws IOException { + public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, + ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) + throws IOException { + this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, + primaryOperationTimeout, new SmallScannerCallableFactory()); + } + + /** + * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan} + * 's start row maybe changed changed. Intended for unit tests to provide their own + * {@link SmallScannerCallableFactory} implementation/mock. + * + * @param conf + * The {@link Configuration} to use. + * @param scan + * {@link Scan} to use in this scanner + * @param tableName + * The table that we wish to rangeGet + * @param connection + * Connection identifying the cluster + * @param rpcFactory + * Factory used to create the {@link RpcRetryingCaller} + * @param controllerFactory + * Factory used to access RPC payloads + * @param pool + * Threadpool for RPC threads + * @param primaryOperationTimeout + * Call timeout + * @param callableFactory + * Factory used to create the {@link SmallScannerCallable} + * @throws IOException + */ + @VisibleForTesting + ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, + ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, + SmallScannerCallableFactory callableFactory) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout); + primaryOperationTimeout); + this.callableFactory = callableFactory; } @Override @@ -125,32 +172,15 @@ private boolean nextScanner(int nbRows, final boolean done, LOG.trace("Advancing internal small scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); } - smallScanCallable = getSmallScanCallable( - getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum, - rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), - getRetries(), getScannerTimeout(), getConf(), caller); + smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan, + getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), + getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller); if (this.scanMetrics != null && skipRowOfFirstResult == null) { this.scanMetrics.countOfRegions.incrementAndGet(); } return true; } - - static ScannerCallableWithReplicas getSmallScanCallable(ClusterConnection connection, - TableName table, Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, - final int cacheNum, RpcControllerFactory controllerFactory, ExecutorService pool, - int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf, - RpcRetryingCaller caller) { - scan.setStartRow(localStartKey); - SmallScannerCallable s = new SmallScannerCallable( - connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0); - ScannerCallableWithReplicas scannerCallableWithReplicas = - new ScannerCallableWithReplicas(table, connection, - s, pool, primaryOperationTimeout, scan, retries, - scannerTimeout, cacheNum, conf, caller); - return scannerCallableWithReplicas; - } - static class SmallScannerCallable extends ScannerCallable { public SmallScannerCallable( ClusterConnection connection, TableName table, Scan scan, @@ -202,46 +232,7 @@ public Result next() throws IOException { return null; } if (cache.size() == 0) { - Result[] values = null; - long remainingResultSize = maxScannerResultSize; - int countdown = this.caching; - boolean currentRegionDone = false; - // Values == null means server-side filter has determined we must STOP - while (remainingResultSize > 0 && countdown > 0 - && nextScanner(countdown, values == null, currentRegionDone)) { - // Server returns a null values if scanning is to stop. Else, - // returns an empty array if scanning is to go on and we've just - // exhausted current region. - // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, - // we do a callWithRetries - // TODO Use the server's response about more results - values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout); - this.currentRegion = smallScanCallable.getHRegionInfo(); - long currentTime = System.currentTimeMillis(); - if (this.scanMetrics != null) { - this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - - lastNext); - } - lastNext = currentTime; - if (values != null && values.length > 0) { - for (int i = 0; i < values.length; i++) { - Result rs = values[i]; - if (i == 0 && this.skipRowOfFirstResult != null - && Bytes.equals(skipRowOfFirstResult, rs.getRow())) { - // Skip the first result - continue; - } - cache.add(rs); - // We don't make Iterator here - for (Cell cell : rs.rawCells()) { - remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); - } - countdown--; - this.lastResult = rs; - } - } - currentRegionDone = countdown > 0; - } + loadCache(); } if (cache.size() > 0) { @@ -254,8 +245,80 @@ && nextScanner(countdown, values == null, currentRegionDone)) { } @Override + protected void loadCache() throws IOException { + Result[] values = null; + long remainingResultSize = maxScannerResultSize; + int countdown = this.caching; + boolean currentRegionDone = false; + // Values == null means server-side filter has determined we must STOP + while (remainingResultSize > 0 && countdown > 0 + && nextScanner(countdown, values == null, currentRegionDone)) { + // Server returns a null values if scanning is to stop. Else, + // returns an empty array if scanning is to go on and we've just + // exhausted current region. + // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, + // we do a callWithRetries + values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout); + this.currentRegion = smallScanCallable.getHRegionInfo(); + long currentTime = System.currentTimeMillis(); + if (this.scanMetrics != null) { + this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime + - lastNext); + } + lastNext = currentTime; + if (values != null && values.length > 0) { + for (int i = 0; i < values.length; i++) { + Result rs = values[i]; + if (i == 0 && this.skipRowOfFirstResult != null + && Bytes.equals(skipRowOfFirstResult, rs.getRow())) { + // Skip the first result + continue; + } + cache.add(rs); + // We don't make Iterator here + for (Cell cell : rs.rawCells()) { + remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); + } + countdown--; + this.lastResult = rs; + } + } + if (smallScanCallable.hasMoreResultsContext()) { + // If the server has more results, the current region is not done + currentRegionDone = !smallScanCallable.getServerHasMoreResults(); + } else { + // not guaranteed to get the context in older versions, fall back to checking countdown + currentRegionDone = countdown > 0; + } + } + } + public void close() { if (!scanMetricsPublished) writeScanMetrics(); closed = true; } + + @VisibleForTesting + protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) { + this.callableFactory = callableFactory; + } + + @InterfaceAudience.Private + protected static class SmallScannerCallableFactory { + + public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, + Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, + RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, + int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller caller) { + scan.setStartRow(localStartKey); + SmallScannerCallable s = new SmallScannerCallable( + connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0); + ScannerCallableWithReplicas scannerCallableWithReplicas = + new ScannerCallableWithReplicas(table, connection, + s, pool, primaryOperationTimeout, scan, retries, + scannerTimeout, cacheNum, conf, caller); + return scannerCallableWithReplicas; + } + + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java new file mode 100644 index 000000000000..4611d08dfe14 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test the ClientSmallReversedScanner. + */ +@Category(SmallTests.class) +public class TestClientSmallReversedScanner { + + Scan scan; + ExecutorService pool; + Configuration conf; + + ClusterConnection clusterConn; + RpcRetryingCallerFactory rpcFactory; + RpcControllerFactory controllerFactory; + RpcRetryingCaller caller; + + @Before + @SuppressWarnings({"deprecation", "unchecked"}) + public void setup() throws IOException { + clusterConn = Mockito.mock(ClusterConnection.class); + rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class); + controllerFactory = Mockito.mock(RpcControllerFactory.class); + pool = Executors.newSingleThreadExecutor(); + scan = new Scan(); + conf = new Configuration(); + Mockito.when(clusterConn.getConfiguration()).thenReturn(conf); + // Mock out the RpcCaller + caller = Mockito.mock(RpcRetryingCaller.class); + // Return the mock from the factory + Mockito.when(rpcFactory. newCaller()).thenReturn(caller); + } + + @After + public void teardown() { + if (null != pool) { + pool.shutdownNow(); + } + } + + /** + * Create a simple Answer which returns true the first time, and false every time after. + */ + private Answer createTrueThenFalseAnswer() { + return new Answer() { + boolean first = true; + + @Override + public Boolean answer(InvocationOnMock invocation) { + if (first) { + first = false; + return true; + } + return false; + } + }; + } + + private SmallScannerCallableFactory getFactory( + final ScannerCallableWithReplicas callableWithReplicas) { + return new SmallScannerCallableFactory() { + @Override + public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, + Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, + RpcControllerFactory controllerFactory, ExecutorService pool, + int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf, + RpcRetryingCaller caller) { + return callableWithReplicas; + } + }; + } + + @Test + public void testContextPresent() throws Exception { + final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, + Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, + Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, + Type.Maximum); + + ScannerCallableWithReplicas callableWithReplicas = Mockito + .mock(ScannerCallableWithReplicas.class); + + // Mock out the RpcCaller + @SuppressWarnings("unchecked") + RpcRetryingCaller caller = Mockito.mock(RpcRetryingCaller.class); + // Return the mock from the factory + Mockito.when(rpcFactory. newCaller()).thenReturn(caller); + + // Intentionally leave a "default" caching size in the Scan. No matter the value, we + // should continue based on the server context + + SmallScannerCallableFactory factory = getFactory(callableWithReplicas); + + try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, + TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, + Integer.MAX_VALUE)) { + + csrs.setScannerCallableFactory(factory); + + // Return some data the first time, less the second, and none after that + Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout())) + .thenAnswer(new Answer() { + int count = 0; + + @Override + public Result[] answer(InvocationOnMock invocation) { + Result[] results; + if (0 == count) { + results = new Result[] {Result.create(new Cell[] {kv3}), + Result.create(new Cell[] {kv2})}; + } else if (1 == count) { + results = new Result[] {Result.create(new Cell[] {kv1})}; + } else { + results = new Result[0]; + } + count++; + return results; + } + }); + + // Pass back the context always + Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true); + // Only have more results the first time + Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer( + createTrueThenFalseAnswer()); + + // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right + HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); + Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); + // Trigger the "no more data" branch for #nextScanner(...) + Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); + + csrs.loadCache(); + + List results = csrs.cache; + Iterator iter = results.iterator(); + assertEquals(3, results.size()); + for (int i = 3; i >= 1 && iter.hasNext(); i--) { + Result result = iter.next(); + byte[] row = result.getRow(); + assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); + assertEquals(1, result.getMap().size()); + } + assertTrue(csrs.closed); + } + } + + @Test + public void testNoContextFewerRecords() throws Exception { + final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, + Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, + Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, + Type.Maximum); + + ScannerCallableWithReplicas callableWithReplicas = Mockito + .mock(ScannerCallableWithReplicas.class); + + // While the server returns 2 records per batch, we expect more records. + scan.setCaching(2); + + SmallScannerCallableFactory factory = getFactory(callableWithReplicas); + + try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, + TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, + Integer.MAX_VALUE)) { + + csrs.setScannerCallableFactory(factory); + + // Return some data the first time, less the second, and none after that + Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout())) + .thenAnswer(new Answer() { + int count = 0; + + @Override + public Result[] answer(InvocationOnMock invocation) { + Result[] results; + if (0 == count) { + results = new Result[] {Result.create(new Cell[] {kv3}), + Result.create(new Cell[] {kv2})}; + } else if (1 == count) { + // Return fewer records than expected (2) + results = new Result[] {Result.create(new Cell[] {kv1})}; + } else { + throw new RuntimeException("Should not fetch a third batch from the server"); + } + count++; + return results; + } + }); + + // Server doesn't return the context + Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false); + // getServerHasMoreResults shouldn't be called when hasMoreResultsContext returns false + Mockito.when(callableWithReplicas.getServerHasMoreResults()) + .thenThrow(new RuntimeException("Should not be called")); + + // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right + HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); + Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); + // Trigger the "no more data" branch for #nextScanner(...) + Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); + + csrs.loadCache(); + + List results = csrs.cache; + Iterator iter = results.iterator(); + assertEquals(2, results.size()); + for (int i = 3; i >= 2 && iter.hasNext(); i--) { + Result result = iter.next(); + byte[] row = result.getRow(); + assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); + assertEquals(1, result.getMap().size()); + } + + // "consume" the Results + results.clear(); + + csrs.loadCache(); + + assertEquals(1, results.size()); + Result result = results.get(0); + assertEquals("row1", new String(result.getRow(), StandardCharsets.UTF_8)); + assertEquals(1, result.getMap().size()); + + assertTrue(csrs.closed); + } + } + + @Test + public void testNoContextNoRecords() throws Exception { + ScannerCallableWithReplicas callableWithReplicas = Mockito + .mock(ScannerCallableWithReplicas.class); + + // While the server return 2 records per RPC, we expect there to be more records. + scan.setCaching(2); + + SmallScannerCallableFactory factory = getFactory(callableWithReplicas); + + try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, + TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, + Integer.MAX_VALUE)) { + + csrs.setScannerCallableFactory(factory); + + // Return some data the first time, less the second, and none after that + Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout())) + .thenReturn(new Result[0]); + + // Server doesn't return the context + Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false); + // Only have more results the first time + Mockito.when(callableWithReplicas.getServerHasMoreResults()) + .thenThrow(new RuntimeException("Should not be called")); + + // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right + HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); + Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); + // Trigger the "no more data" branch for #nextScanner(...) + Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); + + csrs.loadCache(); + + assertEquals(0, csrs.cache.size()); + assertTrue(csrs.closed); + } + } + + @Test + public void testContextNoRecords() throws Exception { + ScannerCallableWithReplicas callableWithReplicas = Mockito + .mock(ScannerCallableWithReplicas.class); + + SmallScannerCallableFactory factory = getFactory(callableWithReplicas); + + try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, + TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, + Integer.MAX_VALUE)) { + + csrs.setScannerCallableFactory(factory); + + // Return some data the first time, less the second, and none after that + Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout())) + .thenReturn(new Result[0]); + + // Server doesn't return the context + Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true); + // Only have more results the first time + Mockito.when(callableWithReplicas.getServerHasMoreResults()) + .thenReturn(false); + + // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right + HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); + Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); + // Trigger the "no more data" branch for #nextScanner(...) + Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); + + csrs.loadCache(); + + assertEquals(0, csrs.cache.size()); + assertTrue(csrs.closed); + } + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java new file mode 100644 index 000000000000..90bf4bbd3abe --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test the ClientSmallScanner. + */ +@Category(SmallTests.class) +public class TestClientSmallScanner { + + Scan scan; + ExecutorService pool; + Configuration conf; + + ClusterConnection clusterConn; + RpcRetryingCallerFactory rpcFactory; + RpcControllerFactory controllerFactory; + RpcRetryingCaller caller; + + @Before + @SuppressWarnings({"deprecation", "unchecked"}) + public void setup() throws IOException { + clusterConn = Mockito.mock(ClusterConnection.class); + rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class); + controllerFactory = Mockito.mock(RpcControllerFactory.class); + pool = Executors.newSingleThreadExecutor(); + scan = new Scan(); + conf = new Configuration(); + Mockito.when(clusterConn.getConfiguration()).thenReturn(conf); + // Mock out the RpcCaller + caller = Mockito.mock(RpcRetryingCaller.class); + // Return the mock from the factory + Mockito.when(rpcFactory. newCaller()).thenReturn(caller); + } + + @After + public void teardown() { + if (null != pool) { + pool.shutdownNow(); + } + } + + /** + * Create a simple Answer which returns true the first time, and false every time after. + */ + private Answer createTrueThenFalseAnswer() { + return new Answer() { + boolean first = true; + + @Override + public Boolean answer(InvocationOnMock invocation) { + if (first) { + first = false; + return true; + } + return false; + } + }; + } + + private SmallScannerCallableFactory getFactory( + final ScannerCallableWithReplicas callableWithReplicas) { + return new SmallScannerCallableFactory() { + @Override + public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, + Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, + RpcControllerFactory controllerFactory, ExecutorService pool, + int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf, + RpcRetryingCaller caller) { + return callableWithReplicas; + } + }; + } + + @Test + public void testContextPresent() throws Exception { + final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, + Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, + Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, + Type.Maximum); + + ScannerCallableWithReplicas callableWithReplicas = Mockito + .mock(ScannerCallableWithReplicas.class); + + // Mock out the RpcCaller + @SuppressWarnings("unchecked") + RpcRetryingCaller caller = Mockito.mock(RpcRetryingCaller.class); + // Return the mock from the factory + Mockito.when(rpcFactory. newCaller()).thenReturn(caller); + + SmallScannerCallableFactory factory = getFactory(callableWithReplicas); + + // Intentionally leave a "default" caching size in the Scan. No matter the value, we + // should continue based on the server context + + try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"), + clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + + css.setScannerCallableFactory(factory); + + // Return some data the first time, less the second, and none after that + Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout())) + .thenAnswer(new Answer() { + int count = 0; + + @Override + public Result[] answer(InvocationOnMock invocation) { + Result[] results; + if (0 == count) { + results = new Result[] {Result.create(new Cell[] {kv1}), + Result.create(new Cell[] {kv2})}; + } else if (1 == count) { + results = new Result[] {Result.create(new Cell[] {kv3})}; + } else { + results = new Result[0]; + } + count++; + return results; + } + }); + + // Pass back the context always + Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true); + // Only have more results the first time + Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer( + createTrueThenFalseAnswer()); + + // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right + HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); + Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); + // Trigger the "no more data" branch for #nextScanner(...) + Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); + + css.loadCache(); + + List results = css.cache; + assertEquals(3, results.size()); + for (int i = 1; i <= 3; i++) { + Result result = results.get(i - 1); + byte[] row = result.getRow(); + assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); + assertEquals(1, result.getMap().size()); + } + + assertTrue(css.closed); + } + } + + @Test + public void testNoContextFewerRecords() throws Exception { + final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, + Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, + Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, + Type.Maximum); + + ScannerCallableWithReplicas callableWithReplicas = Mockito + .mock(ScannerCallableWithReplicas.class); + + // While the server returns 2 records per batch, we expect more records. + scan.setCaching(2); + SmallScannerCallableFactory factory = getFactory(callableWithReplicas); + + try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"), + clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + + css.setScannerCallableFactory(factory); + // Return some data the first time, less the second, and none after that + Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout())) + .thenAnswer(new Answer() { + int count = 0; + + @Override + public Result[] answer(InvocationOnMock invocation) { + Result[] results; + if (0 == count) { + results = new Result[] {Result.create(new Cell[] {kv1}), + Result.create(new Cell[] {kv2})}; + } else if (1 == count) { + // Return fewer records than expected (2) + results = new Result[] {Result.create(new Cell[] {kv3})}; + } else { + throw new RuntimeException("Should not fetch a third batch from the server"); + } + count++; + return results; + } + }); + + // Server doesn't return the context + Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false); + // Only have more results the first time + Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow( + new RuntimeException("Should not be called")); + + // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right + HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); + Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); + // Trigger the "no more data" branch for #nextScanner(...) + Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); + + css.loadCache(); + + List results = css.cache; + assertEquals(2, results.size()); + for (int i = 1; i <= 2; i++) { + Result result = results.get(i - 1); + byte[] row = result.getRow(); + assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); + assertEquals(1, result.getMap().size()); + } + + // "consume" the results we verified + results.clear(); + + css.loadCache(); + + assertEquals(1, results.size()); + Result result = results.get(0); + assertEquals("row3", new String(result.getRow(), StandardCharsets.UTF_8)); + assertEquals(1, result.getMap().size()); + assertTrue(css.closed); + } + } + + @Test + public void testNoContextNoRecords() throws Exception { + ScannerCallableWithReplicas callableWithReplicas = Mockito + .mock(ScannerCallableWithReplicas.class); + + // While the server return 2 records per RPC, we expect there to be more records. + scan.setCaching(2); + + SmallScannerCallableFactory factory = getFactory(callableWithReplicas); + + try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"), + clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + + css.setScannerCallableFactory(factory); + + // Return some data the first time, less the second, and none after that + Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout())) + .thenReturn(new Result[0]); + + // Server doesn't return the context + Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false); + // Only have more results the first time + Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow( + new RuntimeException("Should not be called")); + + // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right + HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); + Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); + // Trigger the "no more data" branch for #nextScanner(...) + Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); + + css.loadCache(); + + assertEquals(0, css.cache.size()); + assertTrue(css.closed); + } + } + + @Test + public void testContextNoRecords() throws Exception { + ScannerCallableWithReplicas callableWithReplicas = Mockito + .mock(ScannerCallableWithReplicas.class); + + SmallScannerCallableFactory factory = getFactory(callableWithReplicas); + + try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"), + clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + + css.setScannerCallableFactory(factory); + + // Return some data the first time, less the second, and none after that + Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout())) + .thenReturn(new Result[0]); + + // Server doesn't return the context + Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true); + // Only have more results the first time + Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenReturn(false); + + // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right + HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); + Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); + // Trigger the "no more data" branch for #nextScanner(...) + Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); + + css.loadCache(); + + assertEquals(0, css.cache.size()); + assertTrue(css.closed); + } + } +}