From 33962e8ba1410dc6aebf48002d9263d07acdd124 Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Wed, 26 Apr 2023 16:10:46 -0700 Subject: [PATCH] KAFKA-14943: Fix ClientQuotaControlManager validation Don't allow setting negative or zero values for quotas. Don't allow SCRAM mechanism names to be used as client quota names. SCRAM mechanisms are not client quotas. (The confusion arose because of internal ZK representation details that treated them both as "client configs.") Add unit tests for ClientQuotaControlManager.isValidIpEntity and ClientQuotaControlManager.configKeysForEntityType. This change doesn't affect metadata record application, only input validation. If there are bad client quotas that are set currently, this change will not alter the current behavior (of throwing an exception and ignoring the bad quota). --- .../common/config/internals/QuotaConfigs.java | 22 ++- .../scala/kafka/server/DynamicConfig.scala | 4 +- .../controller/ClientQuotaControlManager.java | 76 +++++--- .../ClientQuotaControlManagerTest.java | 181 ++++++++++++++++++ 4 files changed, 240 insertions(+), 43 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/QuotaConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/QuotaConfigs.java index 543e67b8976a..e382cb0c172e 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/internals/QuotaConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/internals/QuotaConfigs.java @@ -46,9 +46,11 @@ public class QuotaConfigs { public static final int IP_CONNECTION_RATE_DEFAULT = Integer.MAX_VALUE; - private static Set userClientConfigNames = new HashSet<>(Arrays.asList( - PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, - REQUEST_PERCENTAGE_OVERRIDE_CONFIG, CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG + private final static Set USER_AND_CLIENT_QUOTA_NAMES = new HashSet<>(Arrays.asList( + PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, + CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, + REQUEST_PERCENTAGE_OVERRIDE_CONFIG, + CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG )); private static void buildUserClientQuotaConfigDef(ConfigDef configDef) { @@ -68,21 +70,21 @@ private static void buildUserClientQuotaConfigDef(ConfigDef configDef) { } public static boolean isClientOrUserConfig(String name) { - return userClientConfigNames.contains(name); + return USER_AND_CLIENT_QUOTA_NAMES.contains(name); } - public static ConfigDef userConfigs() { + public static ConfigDef userAndClientQuotaConfigs() { ConfigDef configDef = new ConfigDef(); - ScramMechanism.mechanismNames().forEach(mechanismName -> { - configDef.define(mechanismName, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, - "User credentials for SCRAM mechanism " + mechanismName); - }); buildUserClientQuotaConfigDef(configDef); return configDef; } - public static ConfigDef clientConfigs() { + public static ConfigDef scramMechanismsPlusUserAndClientQuotaConfigs() { ConfigDef configDef = new ConfigDef(); + ScramMechanism.mechanismNames().forEach(mechanismName -> { + configDef.define(mechanismName, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + "User credentials for SCRAM mechanism " + mechanismName); + }); buildUserClientQuotaConfigDef(configDef); return configDef; } diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala index e1ab5ffb5869..8af2dece0421 100644 --- a/core/src/main/scala/kafka/server/DynamicConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicConfig.scala @@ -71,7 +71,7 @@ object DynamicConfig { } object Client { - private val clientConfigs = org.apache.kafka.common.config.internals.QuotaConfigs.clientConfigs() + private val clientConfigs = org.apache.kafka.common.config.internals.QuotaConfigs.userAndClientQuotaConfigs() def configKeys = clientConfigs.configKeys @@ -81,7 +81,7 @@ object DynamicConfig { } object User { - private val userConfigs = org.apache.kafka.common.config.internals.QuotaConfigs.userConfigs() + private val userConfigs = org.apache.kafka.common.config.internals.QuotaConfigs.scramMechanismsPlusUserAndClientQuotaConfigs() def configKeys = userConfigs.configKeys diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java index b859bbfd65d2..d969925e98ba 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java @@ -115,11 +115,11 @@ public void replay(ClientQuotaRecord record) { } private void alterClientQuotaEntity( - ClientQuotaEntity entity, - Map newQuotaConfigs, - List outputRecords, - Map outputResults) { - + ClientQuotaEntity entity, + Map newQuotaConfigs, + List outputRecords, + Map outputResults + ) { // Check entity types and sanitize the names Map validatedEntityMap = new HashMap<>(3); ApiError error = validateEntity(entity, validatedEntityMap); @@ -181,7 +181,7 @@ private void alterClientQuotaEntity( outputResults.put(entity, ApiError.NONE); } - private ApiError configKeysForEntityType(Map entity, Map output) { + static ApiError configKeysForEntityType(Map entity, Map output) { // We only allow certain combinations of quota entity types. Which type is in use determines which config // keys are valid boolean hasUser = entity.containsKey(ClientQuotaEntity.USER); @@ -200,12 +200,8 @@ private ApiError configKeysForEntityType(Map entity, Map entity, Map validKeys, String key, Double value) { - // TODO can this validation be shared with alter configs? + static ApiError validateQuotaKeyValue( + Map validKeys, + String key, + double value + ) { // Ensure we have an allowed quota key ConfigDef.ConfigKey configKey = validKeys.get(key); if (configKey == null) { return new ApiError(Errors.INVALID_REQUEST, "Invalid configuration key " + key); } + if (value <= 0.0) { + return new ApiError(Errors.INVALID_REQUEST, "Quota " + key + " must be greater than 0"); + } // Ensure the quota value is valid switch (configKey.type()) { case DOUBLE: - break; + return ApiError.NONE; case SHORT: + if (value > Short.MAX_VALUE) { + return new ApiError(Errors.INVALID_REQUEST, + "Proposed value for " + key + " is too large for a SHORT."); + } + return getErrorForIntegralQuotaValue(value, key); case INT: - case LONG: - Double epsilon = 1e-6; - Long longValue = Double.valueOf(value + epsilon).longValue(); - if (Math.abs(longValue.doubleValue() - value) > epsilon) { + if (value > Integer.MAX_VALUE) { + return new ApiError(Errors.INVALID_REQUEST, + "Proposed value for " + key + " is too large for an INT."); + } + return getErrorForIntegralQuotaValue(value, key); + case LONG: { + if (value > Long.MAX_VALUE) { return new ApiError(Errors.INVALID_REQUEST, - "Configuration " + key + " must be a Long value"); + "Proposed value for " + key + " is too large for a LONG."); } - break; + return getErrorForIntegralQuotaValue(value, key); + } default: return new ApiError(Errors.UNKNOWN_SERVER_ERROR, "Unexpected config type " + configKey.type() + " should be Long or Double"); } + } + + static ApiError getErrorForIntegralQuotaValue(double value, String key) { + double remainder = Math.abs(value % 1.0); + if (remainder > 1e-6) { + return new ApiError(Errors.INVALID_REQUEST, key + " cannot be a fractional value."); + } return ApiError.NONE; } - // TODO move this somewhere common? - private boolean isValidIpEntity(String ip) { - if (Objects.nonNull(ip)) { - try { - InetAddress.getByName(ip); - return true; - } catch (UnknownHostException e) { - return false; - } - } else { + static boolean isValidIpEntity(String ip) { + if (ip == null) return true; + try { + InetAddress.getByName(ip); return true; + } catch (UnknownHostException e) { + return false; } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java index 1fb81cbf7a47..12f7477a6ca9 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.controller; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.internals.QuotaConfigs; import org.apache.kafka.common.metadata.ClientQuotaRecord; import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData; @@ -36,6 +37,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.function.Consumer; @@ -306,4 +308,183 @@ static ClientQuotaEntity userClientEntity(String user, String clientId) { entries.put(ClientQuotaEntity.CLIENT_ID, clientId); return new ClientQuotaEntity(entries); } + + @Test + public void testIsValidIpEntityWithNull() { + assertTrue(ClientQuotaControlManager.isValidIpEntity(null)); + } + + @Test + public void testIsValidIpEntityWithUnresolvableHostname() { + // example.invalid will never be valid, as per RFC 2606. + assertFalse(ClientQuotaControlManager.isValidIpEntity("example.invalid")); + } + + @Test + public void testIsValidIpEntityWithLocalhost() { + assertTrue(ClientQuotaControlManager.isValidIpEntity("127.0.0.1")); + } + + @Test + public void testConfigKeysForEntityTypeWithUser() { + testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.USER), + Arrays.asList( + "producer_byte_rate", + "consumer_byte_rate", + "controller_mutation_rate", + "request_percentage" + )); + } + + @Test + public void testConfigKeysForEntityTypeWithClientId() { + testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.CLIENT_ID), + Arrays.asList( + "producer_byte_rate", + "consumer_byte_rate", + "controller_mutation_rate", + "request_percentage" + )); + } + + @Test + public void testConfigKeysForEntityTypeWithUserAndClientId() { + testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.CLIENT_ID, ClientQuotaEntity.USER), + Arrays.asList( + "producer_byte_rate", + "consumer_byte_rate", + "controller_mutation_rate", + "request_percentage" + )); + } + + @Test + public void testConfigKeysForEntityTypeWithIp() { + testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.IP), + Arrays.asList( + "connection_creation_rate" + )); + } + + private static Map keysToEntity(List entityKeys) { + HashMap entity = new HashMap<>(); + for (String entityKey : entityKeys) { + if (entityKey.equals(ClientQuotaEntity.IP)) { + entity.put(entityKey, "127.0.0.1"); + } else { + entity.put(entityKey, "foo"); + } + } + return entity; + } + + private static void testConfigKeysForEntityType( + List entityKeys, + List expectedConfigs + ) { + HashMap output = new HashMap<>(); + assertEquals(ApiError.NONE, ClientQuotaControlManager.configKeysForEntityType( + keysToEntity(entityKeys), output)); + assertEquals(new HashSet<>(expectedConfigs), output.keySet()); + } + + @Test + public void testConfigKeysForEmptyEntity() { + testConfigKeysError(Arrays.asList(), + new ApiError(Errors.INVALID_REQUEST, "Invalid empty client quota entity")); + } + + @Test + public void testConfigKeysForEntityTypeWithIpAndUser() { + testConfigKeysError(Arrays.asList(ClientQuotaEntity.IP, ClientQuotaEntity.USER), + new ApiError(Errors.INVALID_REQUEST, "Invalid quota entity combination, IP entity should" + + "not be combined with User or ClientId")); + } + + @Test + public void testConfigKeysForEntityTypeWithIpAndClientId() { + testConfigKeysError(Arrays.asList(ClientQuotaEntity.IP, ClientQuotaEntity.CLIENT_ID), + new ApiError(Errors.INVALID_REQUEST, "Invalid quota entity combination, IP entity should" + + "not be combined with User or ClientId")); + } + + private static void testConfigKeysError(List entityKeys, ApiError expectedError) { + testConfigKeysError(keysToEntity(entityKeys), expectedError); + } + + @Test + public void testConfigKeysForUnresolvableIpEntity() { + testConfigKeysError(Collections.singletonMap(ClientQuotaEntity.IP, "example.invalid"), + new ApiError(Errors.INVALID_REQUEST, "example.invalid is not a valid IP or resolvable host.")); + } + + private static void testConfigKeysError( + Map entity, + ApiError expectedError + ) { + HashMap output = new HashMap<>(); + assertEquals(expectedError, ClientQuotaControlManager.configKeysForEntityType(entity, output)); + } + + private final static HashMap VALID_CLIENT_ID_QUOTA_KEYS; + + static { + VALID_CLIENT_ID_QUOTA_KEYS = new HashMap<>(); + assertEquals(ApiError.NONE, ClientQuotaControlManager.configKeysForEntityType( + keysToEntity(Arrays.asList(ClientQuotaEntity.CLIENT_ID)), VALID_CLIENT_ID_QUOTA_KEYS)); + } + + @Test + public void testValidateQuotaKeyValueForUnknownQuota() { + assertEquals(new ApiError(Errors.INVALID_REQUEST, "Invalid configuration key foobar"), + ClientQuotaControlManager.validateQuotaKeyValue( + VALID_CLIENT_ID_QUOTA_KEYS, "foobar", 1.0)); + } + + @Test + public void testValidateQuotaKeyValueForZeroQuota() { + assertEquals(new ApiError(Errors.INVALID_REQUEST, "Quota producer_byte_rate must be greater than 0"), + ClientQuotaControlManager.validateQuotaKeyValue( + VALID_CLIENT_ID_QUOTA_KEYS, "producer_byte_rate", 0.0)); + } + + @Test + public void testValidateQuotaKeyValueForNegativeQuota() { + assertEquals(new ApiError(Errors.INVALID_REQUEST, "Quota consumer_byte_rate must be greater than 0"), + ClientQuotaControlManager.validateQuotaKeyValue( + VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", -2.0)); + } + + @Test + public void testValidateQuotaKeyValueForValidConsumerByteRate() { + assertEquals(ApiError.NONE, ClientQuotaControlManager.validateQuotaKeyValue( + VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", 1234.0)); + } + + @Test + public void testValidateQuotaKeyValueForConsumerByteRateTooLarge() { + assertEquals(new ApiError(Errors.INVALID_REQUEST, + "Proposed value for consumer_byte_rate is too large for a LONG."), + ClientQuotaControlManager.validateQuotaKeyValue( + VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", 36893488147419103232.4)); + } + + @Test + public void testValidateQuotaKeyValueForFractionalConsumerByteRate() { + assertEquals(new ApiError(Errors.INVALID_REQUEST, "consumer_byte_rate cannot be a fractional value."), + ClientQuotaControlManager.validateQuotaKeyValue( + VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", 2.245)); + } + + @Test + public void testValidateQuotaKeyValueForValidConsumerByteRate2() { + assertEquals(ApiError.NONE, ClientQuotaControlManager.validateQuotaKeyValue( + VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", 1235.0000001)); + } + + @Test + public void testValidateQuotaKeyValueForValidRequestPercentage() { + assertEquals(ApiError.NONE, ClientQuotaControlManager.validateQuotaKeyValue( + VALID_CLIENT_ID_QUOTA_KEYS, "request_percentage", 56.62367)); + } }