diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index e18d3eb34953..ffc3cd50825c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -32,7 +32,8 @@ public interface OperationQuota { public enum OperationType { MUTATE, GET, - SCAN + SCAN, + CHECK_AND_MUTATE } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java index 2e51a8f75610..44357c88d2dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java @@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope; @@ -177,6 +178,31 @@ public static void deleteRegionServerQuota(final Connection connection, final St deleteQuotas(connection, getRegionServerRowKey(regionServer)); } + public static OperationQuota.OperationType getQuotaOperationType(ClientProtos.Action action, + boolean hasCondition) { + if (action.hasMutation()) { + return getQuotaOperationType(action.getMutation(), hasCondition); + } + return OperationQuota.OperationType.GET; + } + + public static OperationQuota.OperationType + getQuotaOperationType(ClientProtos.MutateRequest mutateRequest) { + return getQuotaOperationType(mutateRequest.getMutation(), mutateRequest.hasCondition()); + } + + private static OperationQuota.OperationType + getQuotaOperationType(ClientProtos.MutationProto mutationProto, boolean hasCondition) { + ClientProtos.MutationProto.MutationType mutationType = mutationProto.getMutateType(); + if ( + hasCondition || mutationType == ClientProtos.MutationProto.MutationType.APPEND + || mutationType == ClientProtos.MutationProto.MutationType.INCREMENT + ) { + return OperationQuota.OperationType.CHECK_AND_MUTATE; + } + return OperationQuota.OperationType.MUTATE; + } + protected static void switchExceedThrottleQuota(final Connection connection, boolean exceedThrottleQuotaEnabled) throws IOException { if (exceedThrottleQuotaEnabled) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index de76303e27ac..3c72c662887b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -171,6 +171,8 @@ public OperationQuota checkQuota(final Region region, final OperationQuota.Opera return checkQuota(region, 0, 1, 0); case MUTATE: return checkQuota(region, 1, 0, 0); + case CHECK_AND_MUTATE: + return checkQuota(region, 1, 1, 0); } throw new RuntimeException("Invalid operation type: " + type); } @@ -178,18 +180,24 @@ public OperationQuota checkQuota(final Region region, final OperationQuota.Opera /** * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the * available quota and to report the data/usage of the operation. - * @param region the region where the operation will be performed - * @param actions the "multi" actions to perform + * @param region the region where the operation will be performed + * @param actions the "multi" actions to perform + * @param hasCondition whether the RegionAction has a condition * @return the OperationQuota * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. */ - public OperationQuota checkQuota(final Region region, final List actions) - throws IOException, RpcThrottlingException { + public OperationQuota checkQuota(final Region region, final List actions, + boolean hasCondition) throws IOException, RpcThrottlingException { int numWrites = 0; int numReads = 0; for (final ClientProtos.Action action : actions) { if (action.hasMutation()) { numWrites++; + OperationQuota.OperationType operationType = + QuotaUtil.getQuotaOperationType(action, hasCondition); + if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) { + numReads++; + } } else if (action.hasGet()) { numReads++; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 05d7c2e56055..0538b9706e89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2679,7 +2679,8 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request) try { region = getRegion(regionSpecifier); - quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); + quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(), + regionAction.hasCondition()); } catch (IOException e) { failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); return responseBuilder.build(); @@ -2741,7 +2742,8 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request) try { region = getRegion(regionSpecifier); - quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); + quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(), + regionAction.hasCondition()); } catch (IOException e) { failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); continue; // For this region it's a failure. @@ -2924,7 +2926,8 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque server.getMemStoreFlusher().reclaimMemStoreMemory(); } long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; - quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); + OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request); + quota = getRpcQuotaManager().checkQuota(region, operationType); ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java new file mode 100644 index 000000000000..9b654ac8e6d0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestAtomicReadQuota { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAtomicReadQuota.class); + private static final Logger LOG = LoggerFactory.getLogger(TestAtomicReadQuota.class); + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString()); + private static final byte[] FAMILY = Bytes.toBytes("cf"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + + @AfterClass + public static void tearDown() throws Exception { + ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); + EnvironmentEdgeManager.reset(); + TEST_UTIL.deleteTable(TABLE_NAME); + TEST_UTIL.shutdownMiniCluster(); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + QuotaCache.TEST_FORCE_REFRESH = true; + } + + @Test + public void testIncrementCountedAgainstReadCapacity() throws Exception { + setupQuota(); + + Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); + inc.addColumn(FAMILY, QUALIFIER, 1); + testThrottle(table -> table.increment(inc)); + } + + @Test + public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception { + setupQuota(); + + byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); + Increment inc = new Increment(row); + inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); + Put put = new Put(row); + put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); + + RowMutations rowMutations = new RowMutations(row); + rowMutations.add(inc); + rowMutations.add(put); + testThrottle(table -> table.mutateRow(rowMutations)); + } + + @Test + public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception { + setupQuota(); + + byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); + Put put = new Put(row); + put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); + + RowMutations rowMutations = new RowMutations(row); + rowMutations.add(put); + try (Table table = getTable()) { + for (int i = 0; i < 100; i++) { + table.mutateRow(rowMutations); + } + } + } + + @Test + public void testNonAtomicPutOmittedFromReadCapacity() throws Exception { + setupQuota(); + + byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); + Put put = new Put(row); + put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); + try (Table table = getTable()) { + for (int i = 0; i < 100; i++) { + table.put(put); + } + } + } + + @Test + public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception { + setupQuota(); + + Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); + put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); + Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); + put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); + + Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); + inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); + + List puts = new ArrayList<>(2); + puts.add(put1); + puts.add(put2); + + try (Table table = getTable()) { + for (int i = 0; i < 100; i++) { + table.put(puts); + } + } + } + + @Test + public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception { + setupQuota(); + + byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); + byte[] value = Bytes.toBytes("v"); + Put put = new Put(row); + put.addColumn(FAMILY, Bytes.toBytes("doot"), value); + CheckAndMutate checkAndMutate = + CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, value).build(put); + + testThrottle(table -> table.checkAndMutate(checkAndMutate)); + } + + @Test + public void testAtomicBatchCountedAgainstReadCapacity() throws Exception { + setupQuota(); + + byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); + Increment inc = new Increment(row); + inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); + + List incs = new ArrayList<>(2); + incs.add(inc); + incs.add(inc); + + testThrottle(table -> { + Object[] results = new Object[] {}; + table.batch(incs, results); + return results; + }); + } + + private void setupQuota() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), + ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES)); + } + ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + } + + private void cleanupQuota() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName())); + } + ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + } + + private void testThrottle(ThrowingFunction request) throws Exception { + try (Table table = getTable()) { + // we have a read quota configured, so this should fail + TEST_UTIL.waitFor(60_000, () -> { + try { + request.run(table); + return false; + } catch (Exception e) { + boolean success = e.getCause() instanceof RpcThrottlingException; + if (!success) { + LOG.error("Unexpected exception", e); + } + return success; + } + }); + } finally { + cleanupQuota(); + } + } + + private Table getTable() throws IOException { + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250) + .build(); + } + + @FunctionalInterface + private interface ThrowingFunction { + O run(I input) throws Exception; + } + +}