Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* With this limiter resources will be refilled only after a fixed interval of time.
Expand All @@ -43,6 +43,8 @@ public class FixedIntervalRateLimiter extends RateLimiter {
public static final String RATE_LIMITER_REFILL_INTERVAL_MS =
"hbase.quota.rate.limiter.refill.interval.ms";

private static final Logger LOG = LoggerFactory.getLogger(FixedIntervalRateLimiter.class);

private long nextRefillTime = -1L;
private final long refillInterval;

Expand All @@ -52,10 +54,14 @@ public FixedIntervalRateLimiter() {

public FixedIntervalRateLimiter(long refillInterval) {
super();
Preconditions.checkArgument(getTimeUnitInMillis() >= refillInterval,
String.format("Refill interval %s must be less than or equal to TimeUnit millis %s",
refillInterval, getTimeUnitInMillis()));
this.refillInterval = refillInterval;
long timeUnit = getTimeUnitInMillis();
if (refillInterval > timeUnit) {
LOG.warn(
"Refill interval {} is larger than time unit {}. This is invalid. "
+ "Instead, we will use the time unit {} as the refill interval",
refillInterval, timeUnit, timeUnit);
}
this.refillInterval = Math.min(timeUnit, refillInterval);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,23 @@ private void ensureInitialized() {
}

private Map<String, UserQuotaState> fetchUserQuotaStateEntries() throws IOException {
return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), tableMachineQuotaFactors,
machineQuotaFactor);
return QuotaUtil.fetchUserQuotas(rsServices.getConfiguration(), rsServices.getConnection(),
tableMachineQuotaFactors, machineQuotaFactor);
}

private Map<String, QuotaState> fetchRegionServerQuotaStateEntries() throws IOException {
return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection());
return QuotaUtil.fetchRegionServerQuotas(rsServices.getConfiguration(),
rsServices.getConnection());
}

private Map<TableName, QuotaState> fetchTableQuotaStateEntries() throws IOException {
return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), tableMachineQuotaFactors);
return QuotaUtil.fetchTableQuotas(rsServices.getConfiguration(), rsServices.getConnection(),
tableMachineQuotaFactors);
}

private Map<String, QuotaState> fetchNamespaceQuotaStateEntries() throws IOException {
return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), machineQuotaFactor);
return QuotaUtil.fetchNamespaceQuotas(rsServices.getConfiguration(), rsServices.getConnection(),
machineQuotaFactor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.quotas;

import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand All @@ -25,8 +26,8 @@
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class QuotaLimiterFactory {
public static QuotaLimiter fromThrottle(final Throttle throttle) {
return TimeBasedLimiter.fromThrottle(throttle);
public static QuotaLimiter fromThrottle(Configuration conf, final Throttle throttle) {
return TimeBasedLimiter.fromThrottle(conf, throttle);
}

public static QuotaLimiter update(final QuotaLimiter a, final QuotaLimiter b) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.quotas;

import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand Down Expand Up @@ -57,9 +58,9 @@ public synchronized boolean isBypass() {
/**
* Setup the global quota information. (This operation is part of the QuotaState setup)
*/
public synchronized void setQuotas(final Quotas quotas) {
public synchronized void setQuotas(Configuration conf, final Quotas quotas) {
if (quotas.hasThrottle()) {
globalLimiter = QuotaLimiterFactory.fromThrottle(quotas.getThrottle());
globalLimiter = QuotaLimiterFactory.fromThrottle(conf, quotas.getThrottle());
} else {
globalLimiter = NoopQuotaLimiter.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,9 @@ private static void deleteQuotas(final Connection connection, final byte[] rowKe
doDelete(connection, delete);
}

public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection,
Map<TableName, Double> tableMachineQuotaFactors, double factor) throws IOException {
public static Map<String, UserQuotaState> fetchUserQuotas(final Configuration conf,
final Connection connection, Map<TableName, Double> tableMachineQuotaFactors, double factor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you retrieve the Configuration instance from the Connection instance? Or do you expect that something is passing a conf object that they've modified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether it's guaranteed that the connection passed here will always have a dynamically refreshed configuration. Feel like the intention is more clear with this as an explicit arg, but I agree the redundant arg is a little painful...

throws IOException {
Map<String, UserQuotaState> userQuotas = new HashMap<>();
try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
Scan scan = new Scan();
Expand All @@ -351,7 +352,7 @@ public static Map<String, UserQuotaState> fetchUserQuotas(final Connection conne
@Override
public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
quotas = updateClusterQuotaToMachineQuota(quotas, factor);
quotaInfo.setQuotas(namespace, quotas);
quotaInfo.setQuotas(conf, namespace, quotas);
}

@Override
Expand All @@ -360,13 +361,13 @@ public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
tableMachineQuotaFactors.containsKey(table)
? tableMachineQuotaFactors.get(table)
: 1);
quotaInfo.setQuotas(table, quotas);
quotaInfo.setQuotas(conf, table, quotas);
}

@Override
public void visitUserQuotas(String userName, Quotas quotas) {
quotas = updateClusterQuotaToMachineQuota(quotas, factor);
quotaInfo.setQuotas(quotas);
quotaInfo.setQuotas(conf, quotas);
}
});
} catch (IOException e) {
Expand Down Expand Up @@ -407,7 +408,7 @@ protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf) {
UserQuotaState state = new UserQuotaState();
QuotaProtos.Quotas defaultQuotas =
QuotaProtos.Quotas.newBuilder().setThrottle(throttleBuilder.build()).build();
state.setQuotas(defaultQuotas);
state.setQuotas(conf, defaultQuotas);
return state;
}

Expand All @@ -420,12 +421,12 @@ private static Optional<TimedQuota> buildDefaultTimedQuota(Configuration conf, S
java.util.concurrent.TimeUnit.SECONDS, org.apache.hadoop.hbase.quotas.QuotaScope.MACHINE));
}

public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection,
Map<TableName, Double> tableMachineFactors) throws IOException {
public static Map<TableName, QuotaState> fetchTableQuotas(final Configuration conf,
final Connection connection, Map<TableName, Double> tableMachineFactors) throws IOException {
Scan scan = new Scan();
scan.addFamily(QUOTA_FAMILY_INFO);
scan.setStartStopRowForPrefixScan(QUOTA_TABLE_ROW_KEY_PREFIX);
return fetchGlobalQuotas("table", scan, connection, new KeyFromRow<TableName>() {
return fetchGlobalQuotas(conf, "table", scan, connection, new KeyFromRow<TableName>() {
@Override
public TableName getKeyFromRow(final byte[] row) {
assert isTableRowKey(row);
Expand All @@ -439,12 +440,12 @@ public double getFactor(TableName tableName) {
});
}

public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection,
double factor) throws IOException {
public static Map<String, QuotaState> fetchNamespaceQuotas(final Configuration conf,
final Connection connection, double factor) throws IOException {
Scan scan = new Scan();
scan.addFamily(QUOTA_FAMILY_INFO);
scan.setStartStopRowForPrefixScan(QUOTA_NAMESPACE_ROW_KEY_PREFIX);
return fetchGlobalQuotas("namespace", scan, connection, new KeyFromRow<String>() {
return fetchGlobalQuotas(conf, "namespace", scan, connection, new KeyFromRow<String>() {
@Override
public String getKeyFromRow(final byte[] row) {
assert isNamespaceRowKey(row);
Expand All @@ -458,12 +459,12 @@ public double getFactor(String s) {
});
}

public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection)
throws IOException {
public static Map<String, QuotaState> fetchRegionServerQuotas(final Configuration conf,
final Connection connection) throws IOException {
Scan scan = new Scan();
scan.addFamily(QUOTA_FAMILY_INFO);
scan.setStartStopRowForPrefixScan(QUOTA_REGION_SERVER_ROW_KEY_PREFIX);
return fetchGlobalQuotas("regionServer", scan, connection, new KeyFromRow<String>() {
return fetchGlobalQuotas(conf, "regionServer", scan, connection, new KeyFromRow<String>() {
@Override
public String getKeyFromRow(final byte[] row) {
assert isRegionServerRowKey(row);
Expand All @@ -477,8 +478,9 @@ public double getFactor(String s) {
});
}

public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, final Scan scan,
final Connection connection, final KeyFromRow<K> kfr) throws IOException {
public static <K> Map<K, QuotaState> fetchGlobalQuotas(final Configuration conf,
final String type, final Scan scan, final Connection connection, final KeyFromRow<K> kfr)
throws IOException {

Map<K, QuotaState> globalQuotas = new HashMap<>();
try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
Expand All @@ -499,7 +501,7 @@ public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, final
try {
Quotas quotas = quotasFromData(data);
quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key));
quotaInfo.setQuotas(quotas);
quotaInfo.setQuotas(conf, quotas);
} catch (IOException e) {
LOG.error("Unable to parse {} '{}' quotas", type, key, e);
globalQuotas.remove(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.quotas;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand All @@ -32,7 +31,6 @@
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class TimeBasedLimiter implements QuotaLimiter {
private static final Configuration conf = HBaseConfiguration.create();
private RateLimiter reqsLimiter = null;
private RateLimiter reqSizeLimiter = null;
private RateLimiter writeReqsLimiter = null;
Expand All @@ -47,7 +45,7 @@ public class TimeBasedLimiter implements QuotaLimiter {
private RateLimiter atomicWriteSizeLimiter = null;
private RateLimiter reqHandlerUsageTimeLimiter = null;

private TimeBasedLimiter() {
private TimeBasedLimiter(Configuration conf) {
if (
FixedIntervalRateLimiter.class.getName().equals(
conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class)
Expand Down Expand Up @@ -85,8 +83,8 @@ private TimeBasedLimiter() {
}
}

static QuotaLimiter fromThrottle(final Throttle throttle) {
TimeBasedLimiter limiter = new TimeBasedLimiter();
static QuotaLimiter fromThrottle(Configuration conf, final Throttle throttle) {
TimeBasedLimiter limiter = new TimeBasedLimiter(conf);
boolean isBypass = true;
if (throttle.hasReqNum()) {
setFromTimedQuota(limiter.reqsLimiter, throttle.getReqNum());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
Expand Down Expand Up @@ -89,39 +90,39 @@ public synchronized boolean hasBypassGlobals() {
}

@Override
public synchronized void setQuotas(final Quotas quotas) {
super.setQuotas(quotas);
public synchronized void setQuotas(Configuration conf, final Quotas quotas) {
super.setQuotas(conf, quotas);
bypassGlobals = quotas.getBypassGlobals();
}

/**
* Add the quota information of the specified table. (This operation is part of the QuotaState
* setup)
*/
public synchronized void setQuotas(final TableName table, Quotas quotas) {
tableLimiters = setLimiter(tableLimiters, table, quotas);
public synchronized void setQuotas(Configuration conf, final TableName table, Quotas quotas) {
tableLimiters = setLimiter(conf, tableLimiters, table, quotas);
}

/**
* Add the quota information of the specified namespace. (This operation is part of the QuotaState
* setup)
*/
public void setQuotas(final String namespace, Quotas quotas) {
namespaceLimiters = setLimiter(namespaceLimiters, namespace, quotas);
public void setQuotas(Configuration conf, final String namespace, Quotas quotas) {
namespaceLimiters = setLimiter(conf, namespaceLimiters, namespace, quotas);
}

public boolean hasTableLimiters() {
return tableLimiters != null && !tableLimiters.isEmpty();
}

private <K> Map<K, QuotaLimiter> setLimiter(Map<K, QuotaLimiter> limiters, final K key,
final Quotas quotas) {
private <K> Map<K, QuotaLimiter> setLimiter(Configuration conf, Map<K, QuotaLimiter> limiters,
final K key, final Quotas quotas) {
if (limiters == null) {
limiters = new HashMap<>();
}

QuotaLimiter limiter =
quotas.hasThrottle() ? QuotaLimiterFactory.fromThrottle(quotas.getThrottle()) : null;
quotas.hasThrottle() ? QuotaLimiterFactory.fromThrottle(conf, quotas.getThrottle()) : null;
if (limiter != null && !limiter.isBypass()) {
limiters.put(key, limiter);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ MediumTests.class, CoprocessorTests.class })
public class TestRegionCoprocessorQuotaUsage {
Expand All @@ -51,6 +53,8 @@ public class TestRegionCoprocessorQuotaUsage {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionCoprocessorQuotaUsage.class);

private static final Logger LOG = LoggerFactory.getLogger(TestRegionCoprocessorQuotaUsage.class);

private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static TableName TABLE_NAME = TableName.valueOf("TestRegionCoprocessorQuotaUsage");
private static byte[] CF = Bytes.toBytes("CF");
Expand All @@ -66,11 +70,14 @@ public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c,

// For the purposes of this test, we only need to catch a throttle happening once, then
// let future requests pass through so we don't make this test take any longer than necessary
LOG.info("Intercepting GetOp");
if (!THROTTLING_OCCURRED.get()) {
try {
c.getEnvironment().checkBatchQuota(c.getEnvironment().getRegion(),
OperationQuota.OperationType.GET);
LOG.info("Request was not throttled");
} catch (RpcThrottlingException e) {
LOG.info("Intercepting was throttled");
THROTTLING_OCCURRED.set(true);
throw e;
}
Expand All @@ -91,9 +98,8 @@ public Optional<RegionObserver> getRegionObserver() {
public static void setUp() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setBoolean("hbase.quota.enabled", true);
conf.setInt("hbase.quota.default.user.machine.read.num", 2);
conf.setInt("hbase.quota.default.user.machine.read.num", 1);
conf.set("hbase.quota.rate.limiter", "org.apache.hadoop.hbase.quotas.FixedIntervalRateLimiter");
conf.set("hbase.quota.rate.limiter.refill.interval.ms", "300000");
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCoprocessor.class.getName());
UTIL.startMiniCluster(3);
byte[][] splitKeys = new byte[8][];
Expand All @@ -116,6 +122,9 @@ public void testGet() throws InterruptedException, ExecutionException, IOExcepti
// Hit the table 5 times which ought to be enough to make a throttle happen
for (int i = 0; i < 5; i++) {
TABLE.get(new Get(Bytes.toBytes("000")));
if (THROTTLING_OCCURRED.get()) {
break;
}
}
assertTrue("Throttling did not happen as expected", THROTTLING_OCCURRED.get());
}
Expand Down
Loading