From 3afee27d4f3d2c8de6f34cccd627fdd8a279de8d Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Wed, 10 Jan 2024 09:33:06 -0500 Subject: [PATCH 1/3] HBASE-27402 Clone Scan in ClientScanner to avoid errors with Scan re-used --- .../java/org/apache/hadoop/hbase/client/HTable.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 386a7db3526e..9b82c7750b96 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -300,6 +300,12 @@ private Pair, List> getKeysAndRegionsInRange(final */ @Override public ResultScanner getScanner(Scan scan) throws IOException { + // Clone to avoid modifying user object from scan internals. + // See https://issues.apache.org/jira/browse/HBASE-27402. + return getScannerInternal(new Scan(scan)); + } + + private ResultScanner getScannerInternal(Scan scan) throws IOException { final Span span = new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(scan).build(); try (Scope ignored = span.makeCurrent()) { @@ -344,7 +350,7 @@ public ResultScanner getScanner(Scan scan) throws IOException { public ResultScanner getScanner(byte[] family) throws IOException { Scan scan = new Scan(); scan.addFamily(family); - return getScanner(scan); + return getScannerInternal(scan); } /** @@ -355,7 +361,7 @@ public ResultScanner getScanner(byte[] family) throws IOException { public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { Scan scan = new Scan(); scan.addColumn(family, qualifier); - return getScanner(scan); + return getScannerInternal(scan); } @Override From ecee6fc0c6d45d22cf480f12257d55b52e5a2c43 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Thu, 11 Jan 2024 14:17:52 -0500 Subject: [PATCH 2/3] Retain the user's scan so we can return ScanMetrics if needed --- .../client/ClientAsyncPrefetchScanner.java | 10 ++++---- .../hadoop/hbase/client/ClientScanner.java | 13 +++++++--- .../hbase/client/ClientSimpleScanner.java | 10 ++++---- .../client/ConnectionImplementation.java | 9 +++---- .../apache/hadoop/hbase/client/HTable.java | 24 +++++++++---------- .../hbase/client/ReversedClientScanner.java | 6 ++--- .../hbase/client/TestClientScanner.java | 2 +- .../client/TestScannersFromClientSide.java | 19 +++++++++++++++ 8 files changed, 60 insertions(+), 33 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index abd1267ffc4b..0b892349b80d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -62,15 +62,15 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); - public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, - ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, Scan scanForMetrics, + TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int replicaCallTimeoutMicroSecondScan, ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { - super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, - connectionConfiguration, requestAttributes); + super(configuration, scan, scanForMetrics, name, connection, rpcCallerFactory, + rpcControllerFactory, pool, scanReadRpcTimeout, scannerTimeout, + replicaCallTimeoutMicroSecondScan, connectionConfiguration, requestAttributes); exceptionsQueue = new ConcurrentLinkedQueue<>(); final Context context = Context.current(); final Runnable runnable = context.wrap(new PrefetchRunnable()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index ef8e4b0404f6..df7f900830e2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -61,6 +61,12 @@ public abstract class ClientScanner extends AbstractClientScanner { private static final Logger LOG = LoggerFactory.getLogger(ClientScanner.class); protected final Scan scan; + // We clone the original client Scan to avoid modifying user object from scan internals. + // The below scanForMetrics is the client's object, which we mutate only for returning + // ScanMetrics. + // See https://issues.apache.org/jira/browse/HBASE-27402. + private final Scan scanForMetrics; + protected boolean closed = false; // Current region scanner is against. Gets cleared if current region goes // wonky: e.g. if it splits on us. @@ -101,12 +107,13 @@ public abstract class ClientScanner extends AbstractClientScanner { * @param tableName The table that we wish to scan * @param connection Connection identifying the cluster */ - public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + public ClientScanner(final Configuration conf, final Scan scan, final Scan scanForMetrics, + final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int primaryOperationTimeout, ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { + this.scanForMetrics = scanForMetrics; if (LOG.isTraceEnabled()) { LOG.trace( "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); @@ -294,7 +301,7 @@ protected void writeScanMetrics() { // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published // to Scan will be messed up. - scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, + scanForMetrics.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray()); scanMetricsPublished = true; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java index bde036f88806..b5b7b1926ac2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -35,15 +35,15 @@ */ @InterfaceAudience.Private public class ClientSimpleScanner extends ClientScanner { - public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name, - ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + public ClientSimpleScanner(Configuration configuration, Scan scan, Scan scanForMetrics, + TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int replicaCallTimeoutMicroSecondScan, ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { - super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, - connectionConfiguration, requestAttributes); + super(configuration, scan, scanForMetrics, name, connection, rpcCallerFactory, + rpcControllerFactory, pool, scanReadRpcTimeout, scannerTimeout, + replicaCallTimeoutMicroSecondScan, connectionConfiguration, requestAttributes); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 268a2495d69e..08e24d4d6bff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1049,10 +1049,11 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool final Span span = new TableOperationSpanBuilder(this) .setTableName(TableName.META_TABLE_NAME).setOperation(s).build(); try (Scope ignored = span.makeCurrent(); - ReversedClientScanner rcs = new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, - this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), - connectionConfig.getMetaReadRpcTimeout(), connectionConfig.getMetaScanTimeout(), - metaReplicaCallTimeoutScanInMicroSecond, connectionConfig, Collections.emptyMap())) { + ReversedClientScanner rcs = + new ReversedClientScanner(conf, s, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, + rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(), + connectionConfig.getMetaScanTimeout(), metaReplicaCallTimeoutScanInMicroSecond, + connectionConfig, Collections.emptyMap())) { boolean tableNotFound = true; for (;;) { Result regionInfoRow = rcs.next(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 9b82c7750b96..0dc19e6a6181 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -302,10 +302,10 @@ private Pair, List> getKeysAndRegionsInRange(final public ResultScanner getScanner(Scan scan) throws IOException { // Clone to avoid modifying user object from scan internals. // See https://issues.apache.org/jira/browse/HBASE-27402. - return getScannerInternal(new Scan(scan)); + return getScannerInternal(scan, scan); } - private ResultScanner getScannerInternal(Scan scan) throws IOException { + private ResultScanner getScannerInternal(Scan scan, Scan scanForMetrics) throws IOException { final Span span = new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(scan).build(); try (Scope ignored = span.makeCurrent()) { @@ -325,18 +325,18 @@ private ResultScanner getScannerInternal(Scan scan) throws IOException { final int replicaTimeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan(); if (scan.isReversed()) { - return new ReversedClientScanner(getConfiguration(), scan, getName(), connection, - rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, + return new ReversedClientScanner(getConfiguration(), scan, scanForMetrics, getName(), + connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, replicaTimeout, connConfiguration, requestAttributes); } else { if (async) { - return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection, - rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, - replicaTimeout, connConfiguration, requestAttributes); + return new ClientAsyncPrefetchScanner(getConfiguration(), scan, scanForMetrics, getName(), + connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, + scanTimeout, replicaTimeout, connConfiguration, requestAttributes); } else { - return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection, - rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, - replicaTimeout, connConfiguration, requestAttributes); + return new ClientSimpleScanner(getConfiguration(), scan, scanForMetrics, getName(), + connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, + scanTimeout, replicaTimeout, connConfiguration, requestAttributes); } } } @@ -350,7 +350,7 @@ private ResultScanner getScannerInternal(Scan scan) throws IOException { public ResultScanner getScanner(byte[] family) throws IOException { Scan scan = new Scan(); scan.addFamily(family); - return getScannerInternal(scan); + return getScannerInternal(scan, scan); } /** @@ -361,7 +361,7 @@ public ResultScanner getScanner(byte[] family) throws IOException { public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { Scan scan = new Scan(); scan.addColumn(family, qualifier); - return getScannerInternal(scan); + return getScannerInternal(scan, scan); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index 36bbdb5b60e4..2f363002b14e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -37,13 +37,13 @@ public class ReversedClientScanner extends ClientScanner { * Create a new ReversibleClientScanner for the specified table Note that the passed * {@link Scan}'s start row maybe changed. */ - public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + public ReversedClientScanner(Configuration conf, Scan scan, Scan scanForMetrics, + TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int primaryOperationTimeout, ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { - super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, + super(conf, scan, scanForMetrics, tableName, connection, rpcFactory, controllerFactory, pool, scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, connectionConfiguration, requestAttributes); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 9b5eb91bbd5c..e039be8baa83 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -110,7 +110,7 @@ public MockClientScanner(final Configuration conf, final Scan scan, final TableN ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, ConnectionConfiguration connectionConfig) throws IOException { - super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, + super(conf, scan, scan, tableName, connection, rpcFactory, controllerFactory, pool, HConstants.DEFAULT_HBASE_RPC_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout, connectionConfig, Collections.emptyMap()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 36ecad5276d7..3cc454a8642c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -167,6 +167,25 @@ public TestScannersFromClientSide(Class registryImpl, int numHedgedReqs) thro TEST_UTIL.startMiniCluster(builder.build()); } + @Test + public void testScanImmutable() throws IOException { + TableName tableName = name.getTableName(); + Table table = TEST_UTIL.createTable(tableName, FAMILY); + TEST_UTIL.loadRandomRows(table, FAMILY, 100, 100); + + Scan scan = new Scan().setCaching(-1).setMvccReadPoint(-1).setScanMetricsEnabled(true); + + try (ResultScanner scanner = table.getScanner(scan)) { + scanner.next(1000); + } + // these 2 should be unchanged + assertEquals(-1, scan.getCaching()); + assertEquals(-1, scan.getMvccReadPoint()); + // scan metrics should be populated + assertNotNull(scan.getScanMetrics()); + assertEquals(scan.getScanMetrics().countOfRegions.get(), 1); + } + /** * Test from client side for batch of scan */ From 986daf08298fe6caa7bc12c209430e601f2d87c6 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Thu, 11 Jan 2024 18:32:59 -0500 Subject: [PATCH 3/3] fix accidental commit --- .../src/main/java/org/apache/hadoop/hbase/client/HTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 0dc19e6a6181..fd3de615cb2f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -302,7 +302,7 @@ private Pair, List> getKeysAndRegionsInRange(final public ResultScanner getScanner(Scan scan) throws IOException { // Clone to avoid modifying user object from scan internals. // See https://issues.apache.org/jira/browse/HBASE-27402. - return getScannerInternal(scan, scan); + return getScannerInternal(new Scan(scan), scan); } private ResultScanner getScannerInternal(Scan scan, Scan scanForMetrics) throws IOException {