Skip to content

Commit

Permalink
HBASE-27652 Client-side lock contention around Configuration when usi…
Browse files Browse the repository at this point in the history
…ng read replica regions
  • Loading branch information
ndimiduk committed Mar 3, 2023
1 parent 6bc84de commit 3f219b2
Show file tree
Hide file tree
Showing 23 changed files with 115 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {

this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory =
RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats, this.metrics);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, connectionConfig,
interceptor, this.stats, this.metrics);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);

// Do we publish the status?
Expand Down Expand Up @@ -2250,8 +2250,8 @@ public TableState getTableState(TableName tableName) throws IOException {

@Override
public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, this.getStatisticsTracker(),
metrics);
return RpcRetryingCallerFactory.instantiate(conf, connectionConfig, this.interceptor,
this.getStatisticsTracker(), metrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1305,8 +1305,8 @@ public <R extends Message> void batchCoprocessorService(
final List<String> callbackErrorServers = new ArrayList<>();
Object[] results = new Object[execs.size()];

AsyncProcess asyncProcess = new AsyncProcess(
connection, configuration, RpcRetryingCallerFactory.instantiate(configuration,
AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
RpcRetryingCallerFactory.instantiate(configuration, connConfiguration,
connection.getStatisticsTracker(), connection.getConnectionMetrics()),
RpcControllerFactory.instantiate(configuration));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,10 @@ public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation a
this.addr = addr;
this.multiplexer = htableMultiplexer;
this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
final ConnectionConfiguration connectionConfig =
conn != null ? conn.getConnectionConfiguration() : new ConnectionConfiguration(conf);
RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf,
conn == null ? null : conn.getConnectionMetrics());
connectionConfig, conn == null ? null : conn.getConnectionMetrics());
RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import com.google.errorprone.annotations.RestrictedApi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -29,20 +30,18 @@ public class RpcRetryingCallerFactory {

/** Configuration key for a custom {@link RpcRetryingCaller} */
public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
protected final Configuration conf;
private final ConnectionConfiguration connectionConf;
private final RetryingCallerInterceptor interceptor;
private final int startLogErrorsCnt;
private final MetricsConnection metrics;

public RpcRetryingCallerFactory(Configuration conf) {
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf) {
this(conf, connectionConf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
}

public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor,
MetricsConnection metrics) {
this.conf = conf;
this.connectionConf = new ConnectionConfiguration(conf);
public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf,
RetryingCallerInterceptor interceptor, MetricsConnection metrics) {
this.connectionConf = connectionConf;
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
this.interceptor = interceptor;
Expand Down Expand Up @@ -71,30 +70,43 @@ public <T> RpcRetryingCaller<T> newCaller() {
interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics);
}

@RestrictedApi(explanation = "Should only be called on process initialization", link = "",
allowedOnPath = ".*/hbase-server/src/main/java/.*/(HRegionServer|LoadIncrementalHFiles|SecureBulkLoadClient)\\.java")
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
MetricsConnection metrics) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null,
metrics);
return instantiate(configuration, new ConnectionConfiguration(configuration), metrics);
}

public static RpcRetryingCallerFactory instantiate(Configuration configuration,
ServerStatisticTracker stats, MetricsConnection metrics) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats,
metrics);
ConnectionConfiguration connectionConf, MetricsConnection metrics) {
return instantiate(configuration, connectionConf,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, metrics);
}

public static RpcRetryingCallerFactory instantiate(Configuration configuration,
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats,
ConnectionConfiguration connectionConf, ServerStatisticTracker stats,
MetricsConnection metrics) {
return instantiate(configuration, connectionConf,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, metrics);
}

public static RpcRetryingCallerFactory instantiate(Configuration configuration,
ConnectionConfiguration connectionConf, RetryingCallerInterceptor interceptor,
ServerStatisticTracker stats, MetricsConnection metrics) {
String clazzName = RpcRetryingCallerFactory.class.getName();
String rpcCallerFactoryClazz =
configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
RpcRetryingCallerFactory factory;
if (rpcCallerFactoryClazz.equals(clazzName)) {
factory = new RpcRetryingCallerFactory(configuration, interceptor, metrics);
factory = new RpcRetryingCallerFactory(configuration, connectionConf, interceptor, metrics);
} else {
factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
new Class[] { Configuration.class }, new Object[] { configuration });
try {
factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
new Class[] { Configuration.class }, new Object[] { configuration, connectionConf });
} catch (UnsupportedOperationException e) {
factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
new Class[] { Configuration.class }, new Object[] { configuration });
}
}
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public RpcRetryingCallerWithReadReplicas(RpcControllerFactory rpcControllerFacto
this.operationTimeout = operationTimeout;
this.rpcTimeout = rpcTimeout;
this.timeBeforeReplicas = timeBeforeReplicas;
this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
this.rpcRetryingCallerFactory =
new RpcRetryingCallerFactory(conf, cConnection.getConnectionConfiguration());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class);
volatile ScannerCallable currentScannerCallable;
AtomicBoolean replicaSwitched = new AtomicBoolean(false);
final ClusterConnection cConnection;
private final ClusterConnection cConnection;
protected final ExecutorService pool;
protected final int timeBeforeReplicas;
private final Scan scan;
Expand Down Expand Up @@ -175,12 +175,15 @@ public Result[] call(int timeout) throws IOException {
}
regionReplication = rl.size();
}
// allocate a boundedcompletion pool of some multiple of number of replicas.
// We want to accomodate some RPCs for redundant replica scans (but are still in progress)
// allocate a bounded-completion pool of some multiple of number of replicas.
// We want to accommodate some RPCs for redundant replica scans (but are still in progress)
final ConnectionConfiguration connectionConfig = cConnection != null
? cConnection.getConnectionConfiguration()
: new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf);
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new ResultBoundedCompletionService<>(
RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf,
cConnection == null ? null : cConnection.getConnectionMetrics()),
connectionConfig, cConnection == null ? null : cConnection.getConnectionMetrics()),
pool, regionReplication * 5);

AtomicBoolean done = new AtomicBoolean(false);
Expand Down Expand Up @@ -382,9 +385,12 @@ class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>,
// and we can't invoke it multiple times at the same time)
this.caller = ScannerCallableWithReplicas.this.caller;
if (scan.getConsistency() == Consistency.TIMELINE) {
final ConnectionConfiguration connectionConfig = cConnection != null
? cConnection.getConnectionConfiguration()
: new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf);
this.caller =
RpcRetryingCallerFactory
.instantiate(ScannerCallableWithReplicas.this.conf,
.instantiate(ScannerCallableWithReplicas.this.conf, connectionConfig,
cConnection == null ? null : cConnection.getConnectionMetrics())
.<Result[]> newCaller();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected String rpcCall() throws Exception {
return response.getBulkToken();
}
};
return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null)
return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
.<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) {
throw new IOException(throwable);
Expand All @@ -91,7 +91,7 @@ protected Void rpcCall() throws Exception {
return null;
}
};
RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null).<Void> newCaller()
RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null).<Void> newCaller()
.callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) {
throw new IOException(throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,15 @@ public TableName getTableName() {
}

public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
new RpcControllerFactory(conf));
service = Executors.newFixedThreadPool(5);
this.conf = conf;
}

public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
new RpcControllerFactory(conf));
service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
new CountingThreadFactory(nbThreads));
}
Expand Down Expand Up @@ -1702,7 +1704,8 @@ public Future submit(Runnable runnable) {

static class AsyncProcessForThrowableCheck extends AsyncProcess {
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
new RpcControllerFactory(conf));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class TestAsyncProcessWithRegionException {
private static final Result EMPTY_RESULT = Result.create(null, true);
private static final IOException IOE = new IOException("YOU CAN'T PASS");
private static final Configuration CONF = new Configuration();
private static final ConnectionConfiguration CONNECTION_CONFIG =
new ConnectionConfiguration(CONF);
private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW");
private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW");
Expand Down Expand Up @@ -175,7 +177,7 @@ private static ClusterConnection createHConnection() throws IOException {
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
Mockito.when(hc.getConfiguration()).thenReturn(CONF);
Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF));
Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG);
setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION));
setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION));
Mockito
Expand All @@ -196,7 +198,8 @@ private static class MyAsyncProcess extends AsyncProcess {
private final ExecutorService service = Executors.newFixedThreadPool(5);

MyAsyncProcess(ClusterConnection hc, Configuration conf) {
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
new RpcControllerFactory(conf));
}

public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class TestClientScanner {
Scan scan;
ExecutorService pool;
Configuration conf;
ConnectionConfiguration connectionConfig;

ClusterConnection clusterConn;
RpcRetryingCallerFactory rpcFactory;
Expand All @@ -86,6 +87,7 @@ public void setup() throws IOException {
pool = Executors.newSingleThreadExecutor();
scan = new Scan();
conf = new Configuration();
connectionConfig = new ConnectionConfiguration(conf);
Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
}

Expand Down Expand Up @@ -473,7 +475,7 @@ public void testExceptionsFromReplicasArePropagated() throws IOException {

// Mock a caller which calls the callable for ScannerCallableWithReplicas,
// but throws an exception for the actual scanner calls via callWithRetries.
rpcFactory = new MockRpcRetryingCallerFactory(conf);
rpcFactory = new MockRpcRetryingCallerFactory(conf, connectionConfig);
conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
MockRpcRetryingCallerFactory.class.getName());

Expand All @@ -496,8 +498,9 @@ rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) {

public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory {

public MockRpcRetryingCallerFactory(Configuration conf) {
super(conf);
public MockRpcRetryingCallerFactory(Configuration conf,
ConnectionConfiguration connectionConf) {
super(conf, connectionConf);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ protected Void rpcCall() throws Exception {
return null;
}
};
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
RpcRetryingCallerFactory factory =
new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable, Integer.MAX_VALUE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,9 @@ public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection c
this.sink = sink;
this.connection = connection;
this.operationTimeout = operationTimeout;
this.rpcRetryingCallerFactory = RpcRetryingCallerFactory
.instantiate(connection.getConfiguration(), connection.getConnectionMetrics());
this.rpcRetryingCallerFactory =
RpcRetryingCallerFactory.instantiate(connection.getConfiguration(),
connection.getConnectionConfiguration(), connection.getConnectionMetrics());
this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
this.pool = pool;
this.tableDescriptors = tableDescriptors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]>
List<LoadQueueItem> toRetry = new ArrayList<>();
try {
Configuration conf = getConf();
byte[] region = RpcRetryingCallerFactory.instantiate(conf, null, null).<byte[]> newCaller()
byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller()
.callWithRetries(serviceCallable, Integer.MAX_VALUE);
if (region == null) {
LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ public class HConnectionTestingUtility {
*/
public static ClusterConnection getMockedConnection(final Configuration conf)
throws ZooKeeperConnectionException {
ConnectionConfiguration connectionConfig = new ConnectionConfiguration(conf);
ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class);
Mockito.when(connection.getConfiguration()).thenReturn(conf);
Mockito.when(connection.getConnectionConfiguration()).thenReturn(connectionConfig);
Mockito.when(connection.getRpcControllerFactory())
.thenReturn(Mockito.mock(RpcControllerFactory.class));
// we need a real retrying caller
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf);
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf, connectionConfig);
Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
return connection;
}
Expand Down Expand Up @@ -123,11 +125,12 @@ public static ClusterConnection getMockedConnectionAndDecorate(final Configurati
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
AsyncProcess asyncProcess = new AsyncProcess(c, conf,
RpcRetryingCallerFactory.instantiate(conf, c.getConnectionMetrics()),
RpcRetryingCallerFactory.instantiate(conf, connectionConfiguration, c.getConnectionMetrics()),
RpcControllerFactory.instantiate(conf));
Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess);
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(RpcRetryingCallerFactory
.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null));
Mockito.when(c.getNewRpcRetryingCallerFactory(conf))
.thenReturn(RpcRetryingCallerFactory.instantiate(conf, c.getConnectionConfiguration(),
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null));
Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class));
Table t = Mockito.mock(Table.class);
Mockito.when(c.getTable((TableName) Mockito.any())).thenReturn(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1138,8 +1138,8 @@ private static class ThrowingCallerFactory extends RpcRetryingCallerFactory {

private final Class<? extends HBaseServerException> exceptionClass;

public ThrowingCallerFactory(Configuration conf) {
super(conf);
public ThrowingCallerFactory(Configuration conf, ConnectionConfiguration connectionConfig) {
super(conf, connectionConfig);
this.exceptionClass =
conf.getClass("testSpecialPauseException", null, HBaseServerException.class);
}
Expand Down

0 comments on commit 3f219b2

Please sign in to comment.