Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28349 Count atomic operations against read quotas #5668

Merged
merged 4 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public interface OperationQuota {
public enum OperationType {
MUTATE,
GET,
SCAN
SCAN,
CHECK_AND_MUTATE
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
Expand Down Expand Up @@ -177,6 +178,31 @@ public static void deleteRegionServerQuota(final Connection connection, final St
deleteQuotas(connection, getRegionServerRowKey(regionServer));
}

public static OperationQuota.OperationType getQuotaOperationType(ClientProtos.Action action,
boolean hasCondition) {
if (action.hasMutation()) {
return getQuotaOperationType(action.getMutation(), hasCondition);
}
return OperationQuota.OperationType.GET;
}

public static OperationQuota.OperationType
getQuotaOperationType(ClientProtos.MutateRequest mutateRequest) {
return getQuotaOperationType(mutateRequest.getMutation(), mutateRequest.hasCondition());
}

private static OperationQuota.OperationType
getQuotaOperationType(ClientProtos.MutationProto mutationProto, boolean hasCondition) {
ClientProtos.MutationProto.MutationType mutationType = mutationProto.getMutateType();
if (
hasCondition || mutationType == ClientProtos.MutationProto.MutationType.APPEND
|| mutationType == ClientProtos.MutationProto.MutationType.INCREMENT
) {
return OperationQuota.OperationType.CHECK_AND_MUTATE;
}
return OperationQuota.OperationType.MUTATE;
}

protected static void switchExceedThrottleQuota(final Connection connection,
boolean exceedThrottleQuotaEnabled) throws IOException {
if (exceedThrottleQuotaEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,25 +171,33 @@ public OperationQuota checkQuota(final Region region, final OperationQuota.Opera
return checkQuota(region, 0, 1, 0);
case MUTATE:
return checkQuota(region, 1, 0, 0);
case CHECK_AND_MUTATE:
return checkQuota(region, 1, 1, 0);
}
throw new RuntimeException("Invalid operation type: " + type);
}

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation.
* @param region the region where the operation will be performed
* @param actions the "multi" actions to perform
* @param region the region where the operation will be performed
* @param actions the "multi" actions to perform
* @param hasCondition whether the RegionAction has a condition
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions)
throws IOException, RpcThrottlingException {
public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions,
boolean hasCondition) throws IOException, RpcThrottlingException {
int numWrites = 0;
int numReads = 0;
for (final ClientProtos.Action action : actions) {
if (action.hasMutation()) {
numWrites++;
OperationQuota.OperationType operationType =
QuotaUtil.getQuotaOperationType(action, hasCondition);
if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) {
numReads++;
}
} else if (action.hasGet()) {
numReads++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2679,7 +2679,8 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)

try {
region = getRegion(regionSpecifier);
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(),
regionAction.hasCondition());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
return responseBuilder.build();
Expand Down Expand Up @@ -2741,7 +2742,8 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)

try {
region = getRegion(regionSpecifier);
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(),
regionAction.hasCondition());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
continue; // For this region it's a failure.
Expand Down Expand Up @@ -2924,7 +2926,8 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque
server.getMemStoreFlusher().reclaimMemStoreMemory();
}
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request);
quota = getRpcQuotaManager().checkQuota(region, operationType);
ActivePolicyEnforcement spaceQuotaEnforcement =
getSpaceQuotaManager().getActiveEnforcements();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ RegionServerTests.class, MediumTests.class })
public class TestAtomicReadQuota {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAtomicReadQuota.class);
private static final Logger LOG = LoggerFactory.getLogger(TestAtomicReadQuota.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString());
private static final byte[] FAMILY = Bytes.toBytes("cf");
private static final byte[] QUALIFIER = Bytes.toBytes("q");

@AfterClass
public static void tearDown() throws Exception {
ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
EnvironmentEdgeManager.reset();
TEST_UTIL.deleteTable(TABLE_NAME);
TEST_UTIL.shutdownMiniCluster();
}

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
QuotaCache.TEST_FORCE_REFRESH = true;
}

@Test
public void testIncrementCountedAgainstReadCapacity() throws Exception {
setupQuota();

Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
inc.addColumn(FAMILY, QUALIFIER, 1);
testThrottle(table -> table.increment(inc));
}

@Test
public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception {
setupQuota();

byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
Increment inc = new Increment(row);
inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));

RowMutations rowMutations = new RowMutations(row);
rowMutations.add(inc);
rowMutations.add(put);
testThrottle(table -> table.mutateRow(rowMutations));
bbeaudreault marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception {
setupQuota();

byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));

RowMutations rowMutations = new RowMutations(row);
rowMutations.add(put);
try (Table table = getTable()) {
for (int i = 0; i < 100; i++) {
table.mutateRow(rowMutations);
}
}
}

@Test
public void testNonAtomicPutOmittedFromReadCapacity() throws Exception {
setupQuota();

byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
try (Table table = getTable()) {
for (int i = 0; i < 100; i++) {
table.put(put);
}
}
}

@Test
public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception {
setupQuota();

Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));

Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);

List<Put> puts = new ArrayList<>(2);
puts.add(put1);
puts.add(put2);

try (Table table = getTable()) {
for (int i = 0; i < 100; i++) {
table.put(puts);
}
}
}

@Test
public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception {
setupQuota();

byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
byte[] value = Bytes.toBytes("v");
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("doot"), value);
CheckAndMutate checkAndMutate =
CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, value).build(put);

testThrottle(table -> table.checkAndMutate(checkAndMutate));
}

@Test
public void testAtomicBatchCountedAgainstReadCapacity() throws Exception {
setupQuota();

byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
Increment inc = new Increment(row);
inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);

List<Increment> incs = new ArrayList<>(2);
incs.add(inc);
incs.add(inc);

testThrottle(table -> {
Object[] results = new Object[] {};
table.batch(incs, results);
return results;
});
}

private void setupQuota() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES));
}
ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
}

private void cleanupQuota() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName()));
}
ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
}

private void testThrottle(ThrowingFunction<Table, ?> request) throws Exception {
try (Table table = getTable()) {
// we have a read quota configured, so this should fail
TEST_UTIL.waitFor(60_000, () -> {
try {
request.run(table);
return false;
} catch (Exception e) {
boolean success = e.getCause() instanceof RpcThrottlingException;
if (!success) {
LOG.error("Unexpected exception", e);
}
return success;
}
});
} finally {
cleanupQuota();
}
}

private Table getTable() throws IOException {
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250)
.build();
}

@FunctionalInterface
private interface ThrowingFunction<I, O> {
O run(I input) throws Exception;
}

}