Skip to content
Permalink
Browse files
HBASE-26807 Unify CallQueueTooBigException special pause with CallDro…
…ppedException (#4180)

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
  • Loading branch information
bbeaudreault committed Apr 7, 2022
1 parent c791c81 commit eb4c2ae4b459dafde71db4b9599b5592cfcb9ad7
Showing 32 changed files with 499 additions and 190 deletions.
@@ -18,8 +18,6 @@

package org.apache.hadoop.hbase;

import java.io.IOException;

import org.apache.yetus.audience.InterfaceAudience;

/**
@@ -28,14 +26,16 @@
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
public class CallDroppedException extends IOException {
public class CallDroppedException extends HBaseServerException {
public CallDroppedException() {
super();
// For now all call drops are due to server being overloaded.
// We could decouple this if desired.
super(true);
}

// Absence of this constructor prevents proper unwrapping of
// remote exception on the client side
public CallDroppedException(String message) {
super(message);
super(true, message);
}
}
@@ -18,13 +18,15 @@

package org.apache.hadoop.hbase;

import java.io.IOException;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Returned to clients when their request was dropped because the call queue was too big to
* accept a new call. Clients should retry upon receiving it.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
public class CallQueueTooBigException extends IOException {
public class CallQueueTooBigException extends CallDroppedException {
public CallQueueTooBigException() {
super();
}
@@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Base class for exceptions thrown by an HBase server. May contain extra info about
* the state of the server when the exception was thrown.
*/
@InterfaceAudience.Public
public class HBaseServerException extends HBaseIOException {
private boolean serverOverloaded;

public HBaseServerException() {
this(false);
}

public HBaseServerException(String message) {
this(false, message);
}

public HBaseServerException(boolean serverOverloaded) {
this.serverOverloaded = serverOverloaded;
}

public HBaseServerException(boolean serverOverloaded, String message) {
super(message);
this.serverOverloaded = serverOverloaded;
}

/**
* @param t throwable to check for server overloaded state
* @return True if the server was considered overloaded when the exception was thrown
*/
public static boolean isServerOverloaded(Throwable t) {
if (t instanceof HBaseServerException) {
return ((HBaseServerException) t).isServerOverloaded();
}
return false;
}

/**
* Necessary for parsing RemoteException on client side
* @param serverOverloaded True if server was overloaded when exception was thrown
*/
public void setServerOverloaded(boolean serverOverloaded) {
this.serverOverloaded = serverOverloaded;
}

/**
* @return True if server was considered overloaded when exception was thrown
*/
public boolean isServerOverloaded() {
return serverOverloaded;
}
}
@@ -21,6 +21,7 @@

import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.yetus.audience.InterfaceAudience;

/**
@@ -50,21 +51,42 @@ public interface AsyncAdminBuilder {
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying.
* @return this for invocation chaining
* @see #setRetryPauseForCQTBE(long, TimeUnit)
* @see #setRetryPauseForServerOverloaded(long, TimeUnit)
*/
AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit);

/**
* Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an
* exponential policy to generate sleep time when retrying.
* Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}.
* We use an exponential policy to generate sleep time from this base when retrying.
* <p/>
* This value should be greater than the normal pause value which could be set with the above
* {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException}
* means the server is overloaded. We just use the normal pause value for
* {@code CallQueueTooBigException} if here you specify a smaller value.
* {@link #setRetryPause(long, TimeUnit)} method, as usually
* {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use
* the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you
* specify a smaller value.
*
* @see #setRetryPause(long, TimeUnit)
* @deprecated Since 2.5.0, will be removed in 4.0.0. Please use
* {@link #setRetryPauseForServerOverloaded(long, TimeUnit)} instead.
*/
AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit);
@Deprecated
default AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit) {
return setRetryPauseForServerOverloaded(pause, unit);
}

/**
* Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}.
* We use an exponential policy to generate sleep time when retrying.
* <p/>
* This value should be greater than the normal pause value which could be set with the above
* {@link #setRetryPause(long, TimeUnit)} method, as usually
* {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use
* the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you
* specify a smaller value.
*
* @see #setRetryPause(long, TimeUnit)
*/
AsyncAdminBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit);

/**
* Set the max retry times for an admin operation. Usually it is the max attempt times minus 1.
@@ -33,7 +33,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {

protected long pauseNs;

protected long pauseForCQTBENs;
protected long pauseNsForServerOverloaded;

protected int maxAttempts;

@@ -43,7 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
this.operationTimeoutNs = connConf.getOperationTimeoutNs();
this.pauseNs = connConf.getPauseNs();
this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded();
this.maxAttempts = connConf.getMaxRetries();
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
}
@@ -67,8 +67,8 @@ public AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit) {
}

@Override
public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(timeout);
public AsyncAdminBuilder setRetryPauseForServerOverloaded(long timeout, TimeUnit unit) {
this.pauseNsForServerOverloaded = unit.toNanos(timeout);
return this;
}

@@ -44,10 +44,10 @@
private ServerName serverName;

public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;
}
@@ -45,9 +45,9 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RetryImmediatelyException;
@@ -63,9 +63,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.io.netty.util.Timer;

import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -104,7 +102,7 @@

private final long pauseNs;

private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;

private final int maxAttempts;

@@ -150,13 +148,14 @@ public int getPriority() {
}

public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, List<? extends Row> actions, long pauseNs, long pauseForCQTBENs,
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
TableName tableName, List<? extends Row> actions, long pauseNs,
long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -466,17 +465,17 @@ private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Thro
.collect(Collectors.toList());
addError(copiedActions, error, serverName);
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
error instanceof CallQueueTooBigException);
HBaseServerException.isServerOverloaded(error));
}

private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
boolean isCallQueueTooBig) {
boolean isServerOverloaded) {
if (immediately) {
groupAndSend(actions, tries);
return;
}
long delayNs;
long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs;
long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
@@ -75,7 +75,7 @@ class AsyncClientScanner {

private final long pauseNs;

private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;

private final int maxAttempts;

@@ -90,7 +90,7 @@ class AsyncClientScanner {
private final Span span;

public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
@@ -104,7 +104,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
this.conn = conn;
this.retryTimer = retryTimer;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -198,7 +198,8 @@ private void startScan(OpenScannerResponse resp) {
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
(hasMore, error) -> {
try (Scope ignored = span.makeCurrent()) {
@@ -232,7 +233,8 @@ private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
}
}

0 comments on commit eb4c2ae

Please sign in to comment.