Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27402 Clone Scan in ClientScanner to avoid errors with Scan re-used #5614

Merged
merged 3 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, Scan scanForMetrics,
TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
connectionConfiguration, requestAttributes);
super(configuration, scan, scanForMetrics, name, connection, rpcCallerFactory,
rpcControllerFactory, pool, scanReadRpcTimeout, scannerTimeout,
replicaCallTimeoutMicroSecondScan, connectionConfiguration, requestAttributes);
exceptionsQueue = new ConcurrentLinkedQueue<>();
final Context context = Context.current();
final Runnable runnable = context.wrap(new PrefetchRunnable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public abstract class ClientScanner extends AbstractClientScanner {
private static final Logger LOG = LoggerFactory.getLogger(ClientScanner.class);

protected final Scan scan;
// We clone the original client Scan to avoid modifying user object from scan internals.
// The below scanForMetrics is the client's object, which we mutate only for returning
// ScanMetrics.
// See https://issues.apache.org/jira/browse/HBASE-27402.
private final Scan scanForMetrics;

protected boolean closed = false;
// Current region scanner is against. Gets cleared if current region goes
// wonky: e.g. if it splits on us.
Expand Down Expand Up @@ -101,12 +107,13 @@ public abstract class ClientScanner extends AbstractClientScanner {
* @param tableName The table that we wish to scan
* @param connection Connection identifying the cluster
*/
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
public ClientScanner(final Configuration conf, final Scan scan, final Scan scanForMetrics,
final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int primaryOperationTimeout,
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
throws IOException {
this.scanForMetrics = scanForMetrics;
if (LOG.isTraceEnabled()) {
LOG.trace(
"Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
Expand Down Expand Up @@ -294,7 +301,7 @@ protected void writeScanMetrics() {
// As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not
// call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published
// to Scan will be messed up.
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA,
scanForMetrics.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA,
ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray());
scanMetricsPublished = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@
*/
@InterfaceAudience.Private
public class ClientSimpleScanner extends ClientScanner {
public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name,
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
public ClientSimpleScanner(Configuration configuration, Scan scan, Scan scanForMetrics,
TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
connectionConfiguration, requestAttributes);
super(configuration, scan, scanForMetrics, name, connection, rpcCallerFactory,
rpcControllerFactory, pool, scanReadRpcTimeout, scannerTimeout,
replicaCallTimeoutMicroSecondScan, connectionConfiguration, requestAttributes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,10 +1049,11 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool
final Span span = new TableOperationSpanBuilder(this)
.setTableName(TableName.META_TABLE_NAME).setOperation(s).build();
try (Scope ignored = span.makeCurrent();
ReversedClientScanner rcs = new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME,
this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(),
connectionConfig.getMetaReadRpcTimeout(), connectionConfig.getMetaScanTimeout(),
metaReplicaCallTimeoutScanInMicroSecond, connectionConfig, Collections.emptyMap())) {
ReversedClientScanner rcs =
new ReversedClientScanner(conf, s, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(),
connectionConfig.getMetaScanTimeout(), metaReplicaCallTimeoutScanInMicroSecond,
connectionConfig, Collections.emptyMap())) {
boolean tableNotFound = true;
for (;;) {
Result regionInfoRow = rcs.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final
*/
@Override
public ResultScanner getScanner(Scan scan) throws IOException {
// Clone to avoid modifying user object from scan internals.
// See https://issues.apache.org/jira/browse/HBASE-27402.
return getScannerInternal(new Scan(scan), scan);
}

private ResultScanner getScannerInternal(Scan scan, Scan scanForMetrics) throws IOException {
final Span span =
new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(scan).build();
try (Scope ignored = span.makeCurrent()) {
Expand All @@ -319,18 +325,18 @@ public ResultScanner getScanner(Scan scan) throws IOException {
final int replicaTimeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan();

if (scan.isReversed()) {
return new ReversedClientScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
return new ReversedClientScanner(getConfiguration(), scan, scanForMetrics, getName(),
connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout, connConfiguration, requestAttributes);
} else {
if (async) {
return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout, connConfiguration, requestAttributes);
return new ClientAsyncPrefetchScanner(getConfiguration(), scan, scanForMetrics, getName(),
connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout,
scanTimeout, replicaTimeout, connConfiguration, requestAttributes);
} else {
return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout, connConfiguration, requestAttributes);
return new ClientSimpleScanner(getConfiguration(), scan, scanForMetrics, getName(),
connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout,
scanTimeout, replicaTimeout, connConfiguration, requestAttributes);
}
}
}
Expand All @@ -344,7 +350,7 @@ public ResultScanner getScanner(Scan scan) throws IOException {
public ResultScanner getScanner(byte[] family) throws IOException {
Scan scan = new Scan();
scan.addFamily(family);
return getScanner(scan);
return getScannerInternal(scan, scan);
}

/**
Expand All @@ -355,7 +361,7 @@ public ResultScanner getScanner(byte[] family) throws IOException {
public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
Scan scan = new Scan();
scan.addColumn(family, qualifier);
return getScanner(scan);
return getScannerInternal(scan, scan);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ public class ReversedClientScanner extends ClientScanner {
* Create a new ReversibleClientScanner for the specified table Note that the passed
* {@link Scan}'s start row maybe changed.
*/
public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
public ReversedClientScanner(Configuration conf, Scan scan, Scan scanForMetrics,
TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int primaryOperationTimeout,
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
super(conf, scan, scanForMetrics, tableName, connection, rpcFactory, controllerFactory, pool,
scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, connectionConfiguration,
requestAttributes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public MockClientScanner(final Configuration conf, final Scan scan, final TableN
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
ConnectionConfiguration connectionConfig) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
super(conf, scan, scan, tableName, connection, rpcFactory, controllerFactory, pool,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout,
connectionConfig, Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,25 @@ public TestScannersFromClientSide(Class<?> registryImpl, int numHedgedReqs) thro
TEST_UTIL.startMiniCluster(builder.build());
}

@Test
public void testScanImmutable() throws IOException {
TableName tableName = name.getTableName();
Table table = TEST_UTIL.createTable(tableName, FAMILY);
TEST_UTIL.loadRandomRows(table, FAMILY, 100, 100);

Scan scan = new Scan().setCaching(-1).setMvccReadPoint(-1).setScanMetricsEnabled(true);

try (ResultScanner scanner = table.getScanner(scan)) {
scanner.next(1000);
}
// these 2 should be unchanged
assertEquals(-1, scan.getCaching());
assertEquals(-1, scan.getMvccReadPoint());
// scan metrics should be populated
assertNotNull(scan.getScanMetrics());
assertEquals(scan.getScanMetrics().countOfRegions.get(), 1);
}

/**
* Test from client side for batch of scan
*/
Expand Down