Skip to content

Commit

Permalink
HBASE-27657 Connection and Request Attributes (#5326)
Browse files Browse the repository at this point in the history
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
  • Loading branch information
rmdmattingly committed Jul 24, 2023
1 parent aec9db4 commit 83ea0da
Show file tree
Hide file tree
Showing 51 changed files with 712 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
Expand All @@ -44,7 +45,7 @@ public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl con
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap());
this.serverName = serverName;
this.callable = callable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class AsyncBatchRpcRetryingCaller<T> {

private final HBaseServerExceptionPauseManager pauseManager;

private final Map<String, byte[]> requestAttributes;

// we can not use HRegionLocation as the map key because the hashCode and equals method of
// HRegionLocation only consider serverName.
private static final class RegionRequest {
Expand Down Expand Up @@ -149,7 +151,8 @@ public int getPriority() {

public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, List<? extends Row> actions, long pauseNs, long pauseNsForServerOverloaded,
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt,
Map<String, byte[]> requestAttributes) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
Expand Down Expand Up @@ -180,6 +183,7 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
this.startNs = System.nanoTime();
this.pauseManager =
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
this.requestAttributes = requestAttributes;
}

private static boolean hasIncrementOrAppend(Row action) {
Expand Down Expand Up @@ -392,6 +396,7 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr
HBaseRpcController controller = conn.rpcControllerFactory.newController();
resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
calcPriority(serverReq.getPriority(), tableName));
controller.setRequestAttributes(requestAttributes);
if (!cells.isEmpty()) {
controller.setCellScanner(createCellScanner(cells));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -92,9 +93,12 @@ class AsyncClientScanner {

private final Span span;

private final Map<String, byte[]> requestAttributes;

public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt,
Map<String, byte[]> requestAttributes) {
if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
}
Expand All @@ -113,6 +117,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
this.resultCache = createScanResultCache(scan);
this.requestAttributes = requestAttributes;
if (scan.isScanMetricsEnabled()) {
this.scanMetrics = new ScanMetrics();
consumer.onScanMetricsCreated(scanMetrics);
Expand Down Expand Up @@ -191,15 +196,17 @@ private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcControlle
}

private void startScan(OpenScannerResponse resp) {
addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
.location(resp.loc).remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.start(resp.controller, resp.resp), (hasMore, error) -> {
addListener(
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
.remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.setRequestAttributes(requestAttributes).start(resp.controller, resp.resp),
(hasMore, error) -> {
try (Scope ignored = span.makeCurrent()) {
if (error != null) {
try {
Expand Down Expand Up @@ -231,8 +238,8 @@ private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
.priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
.call();
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.setRequestAttributes(requestAttributes).action(this::callOpenScanner).call();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.opentelemetry.api.trace.Span;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -127,6 +129,11 @@ public class AsyncConnectionImpl implements AsyncConnection {

public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
SocketAddress localAddress, User user) {
this(conf, registry, clusterId, localAddress, user, Collections.emptyMap());
}

public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
SocketAddress localAddress, User user, Map<String, byte[]> connectionAttributes) {
this.conf = conf;
this.user = user;
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
Expand All @@ -142,8 +149,8 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
} else {
this.metrics = Optional.empty();
}
this.rpcClient =
RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null));
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress,
metrics.orElse(null), connectionAttributes);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcTimeout =
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
Expand Down Expand Up @@ -47,7 +48,7 @@ public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl
Callable<T> callable, int priority, long pauseNs, long pauseNsForServerOverloaded,
int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxRetries,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap());
this.callable = callable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -78,7 +79,7 @@ public abstract class AsyncRpcRetryingCaller<T> {

public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
long rpcTimeoutNs, int startLogErrorsCnt, Map<String, byte[]> requestAttributes) {
this.retryTimer = retryTimer;
this.conn = conn;
this.priority = priority;
Expand All @@ -89,6 +90,7 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr
this.future = new CompletableFuture<>();
this.controller = conn.rpcControllerFactory.newController();
this.controller.setPriority(priority);
this.controller.setRequestAttributes(requestAttributes);
this.exceptions = new ArrayList<>();
this.startNs = System.nanoTime();
this.pauseManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HRegionLocation;
Expand Down Expand Up @@ -83,6 +85,8 @@ public class SingleRequestCallerBuilder<T> extends BuilderBase {

private int priority = PRIORITY_UNSET;

private Map<String, byte[]> requestAttributes = Collections.emptyMap();

public SingleRequestCallerBuilder<T> table(TableName tableName) {
this.tableName = tableName;
return this;
Expand Down Expand Up @@ -144,6 +148,12 @@ public SingleRequestCallerBuilder<T> priority(int priority) {
return this;
}

public SingleRequestCallerBuilder<T>
setRequestAttributes(Map<String, byte[]> requestAttributes) {
this.requestAttributes = requestAttributes;
return this;
}

private void preCheck() {
checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
checkNotNull(tableName, "tableName is null");
Expand All @@ -157,7 +167,7 @@ public AsyncSingleRequestRpcRetryingCaller<T> build() {
preCheck();
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId,
locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
}

/**
Expand Down Expand Up @@ -201,6 +211,8 @@ public class ScanSingleRegionCallerBuilder extends BuilderBase {

private int priority = PRIORITY_UNSET;

private Map<String, byte[]> requestAttributes = Collections.emptyMap();

public ScanSingleRegionCallerBuilder id(long scannerId) {
this.scannerId = scannerId;
return this;
Expand Down Expand Up @@ -278,6 +290,12 @@ public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
return this;
}

public ScanSingleRegionCallerBuilder
setRequestAttributes(Map<String, byte[]> requestAttributes) {
this.requestAttributes = requestAttributes;
return this;
}

private void preCheck() {
checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
checkNotNull(scan, "scan is null");
Expand All @@ -293,7 +311,7 @@ public AsyncScanSingleRegionRpcRetryingCaller build() {
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts,
scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
}

/**
Expand Down Expand Up @@ -322,6 +340,8 @@ public class BatchCallerBuilder extends BuilderBase {

private long rpcTimeoutNs = -1L;

private Map<String, byte[]> requestAttributes = Collections.emptyMap();

public BatchCallerBuilder table(TableName tableName) {
this.tableName = tableName;
return this;
Expand Down Expand Up @@ -362,10 +382,15 @@ public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
return this;
}

public BatchCallerBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
this.requestAttributes = requestAttributes;
return this;
}

public <T> AsyncBatchRpcRetryingCaller<T> build() {
return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
startLogErrorsCnt, requestAttributes);
}

public <T> List<CompletableFuture<T>> call() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -316,7 +317,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
int startLogErrorsCnt, Map<String, byte[]> requestAttributes) {
this.retryTimer = retryTimer;
this.conn = conn;
this.scan = scan;
Expand All @@ -341,6 +342,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
this.priority = priority;
this.controller = conn.rpcControllerFactory.newController();
this.controller.setPriority(priority);
this.controller.setRequestAttributes(requestAttributes);
this.exceptions = new ArrayList<>();
this.pauseManager =
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
Expand Down Expand Up @@ -49,7 +50,7 @@ public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseNsForServerOverloaded, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap());
this.serverName = serverName;
this.callable = callable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HRegionLocation;
Expand Down Expand Up @@ -57,9 +58,10 @@ CompletableFuture<T> call(HBaseRpcController controller, HRegionLocation loc,
public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
Callable<T> callable, int priority, long pauseNs, long pauseNsForServerOverloaded,
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt,
Map<String, byte[]> requestAttributes) {
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
this.tableName = tableName;
this.row = row;
this.replicaId = replicaId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import static org.apache.hadoop.hbase.util.FutureUtils.allOf;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -110,6 +112,14 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
*/
long getScanTimeout(TimeUnit unit);

/**
* Get the map of request attributes
* @return a map of request attributes supplied by the client
*/
default Map<String, byte[]> getRequestAttributes() {
throw new NotImplementedException("Add an implementation!");
}

/**
* Test for the existence of columns in the table, as specified by the Get.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ default AsyncTableBuilder<C> setMaxRetries(int maxRetries) {
*/
AsyncTableBuilder<C> setStartLogErrorsCnt(int startLogErrorsCnt);

/**
* Set a request attribute
*/
AsyncTableBuilder<C> setRequestAttribute(String key, byte[] value);

/**
* Create the {@link AsyncTable} instance.
*/
Expand Down

0 comments on commit 83ea0da

Please sign in to comment.