Skip to content

Commit

Permalink
KAFKA-14943: Fix ClientQuotaControlManager validation
Browse files Browse the repository at this point in the history
Don't allow setting negative or zero values for quotas. Don't allow SCRAM mechanism names to be
used as client quota names. SCRAM mechanisms are not client quotas. (The confusion arose because of
internal ZK representation details that treated them both as "client configs.")

Add unit tests for ClientQuotaControlManager.isValidIpEntity and
ClientQuotaControlManager.configKeysForEntityType.

This change doesn't affect metadata record application, only input validation. If there are bad
client quotas that are set currently, this change will not alter the current behavior (of throwing
an exception and ignoring the bad quota).
  • Loading branch information
cmccabe committed Apr 27, 2023
1 parent 8bde4e7 commit 7049333
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 43 deletions.
Expand Up @@ -46,9 +46,11 @@ public class QuotaConfigs {

public static final int IP_CONNECTION_RATE_DEFAULT = Integer.MAX_VALUE;

private static Set<String> userClientConfigNames = new HashSet<>(Arrays.asList(
PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, CONSUMER_BYTE_RATE_OVERRIDE_CONFIG,
REQUEST_PERCENTAGE_OVERRIDE_CONFIG, CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG
private final static Set<String> USER_AND_CLIENT_QUOTA_NAMES = new HashSet<>(Arrays.asList(
PRODUCER_BYTE_RATE_OVERRIDE_CONFIG,
CONSUMER_BYTE_RATE_OVERRIDE_CONFIG,
REQUEST_PERCENTAGE_OVERRIDE_CONFIG,
CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG
));

private static void buildUserClientQuotaConfigDef(ConfigDef configDef) {
Expand All @@ -68,21 +70,21 @@ private static void buildUserClientQuotaConfigDef(ConfigDef configDef) {
}

public static boolean isClientOrUserConfig(String name) {
return userClientConfigNames.contains(name);
return USER_AND_CLIENT_QUOTA_NAMES.contains(name);
}

public static ConfigDef userConfigs() {
public static ConfigDef userAndClientQuotaConfigs() {
ConfigDef configDef = new ConfigDef();
ScramMechanism.mechanismNames().forEach(mechanismName -> {
configDef.define(mechanismName, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"User credentials for SCRAM mechanism " + mechanismName);
});
buildUserClientQuotaConfigDef(configDef);
return configDef;
}

public static ConfigDef clientConfigs() {
public static ConfigDef scramMechanismsPlusUserAndClientQuotaConfigs() {
ConfigDef configDef = new ConfigDef();
ScramMechanism.mechanismNames().forEach(mechanismName -> {
configDef.define(mechanismName, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"User credentials for SCRAM mechanism " + mechanismName);
});
buildUserClientQuotaConfigDef(configDef);
return configDef;
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/DynamicConfig.scala
Expand Up @@ -71,7 +71,7 @@ object DynamicConfig {
}

object Client {
private val clientConfigs = org.apache.kafka.common.config.internals.QuotaConfigs.clientConfigs()
private val clientConfigs = org.apache.kafka.common.config.internals.QuotaConfigs.userAndClientQuotaConfigs()

def configKeys = clientConfigs.configKeys

Expand All @@ -81,7 +81,7 @@ object DynamicConfig {
}

object User {
private val userConfigs = org.apache.kafka.common.config.internals.QuotaConfigs.userConfigs()
private val userConfigs = org.apache.kafka.common.config.internals.QuotaConfigs.scramMechanismsPlusUserAndClientQuotaConfigs()

def configKeys = userConfigs.configKeys

Expand Down
Expand Up @@ -115,11 +115,11 @@ public void replay(ClientQuotaRecord record) {
}

private void alterClientQuotaEntity(
ClientQuotaEntity entity,
Map<String, Double> newQuotaConfigs,
List<ApiMessageAndVersion> outputRecords,
Map<ClientQuotaEntity, ApiError> outputResults) {

ClientQuotaEntity entity,
Map<String, Double> newQuotaConfigs,
List<ApiMessageAndVersion> outputRecords,
Map<ClientQuotaEntity, ApiError> outputResults
) {
// Check entity types and sanitize the names
Map<String, String> validatedEntityMap = new HashMap<>(3);
ApiError error = validateEntity(entity, validatedEntityMap);
Expand Down Expand Up @@ -181,7 +181,7 @@ private void alterClientQuotaEntity(
outputResults.put(entity, ApiError.NONE);
}

private ApiError configKeysForEntityType(Map<String, String> entity, Map<String, ConfigDef.ConfigKey> output) {
static ApiError configKeysForEntityType(Map<String, String> entity, Map<String, ConfigDef.ConfigKey> output) {
// We only allow certain combinations of quota entity types. Which type is in use determines which config
// keys are valid
boolean hasUser = entity.containsKey(ClientQuotaEntity.USER);
Expand All @@ -200,12 +200,8 @@ private ApiError configKeysForEntityType(Map<String, String> entity, Map<String,
return new ApiError(Errors.INVALID_REQUEST, entity.get(ClientQuotaEntity.IP) + " is not a valid IP or resolvable host.");
}
}
} else if (hasUser && hasClientId) {
configKeys = QuotaConfigs.userConfigs().configKeys();
} else if (hasUser) {
configKeys = QuotaConfigs.userConfigs().configKeys();
} else if (hasClientId) {
configKeys = QuotaConfigs.clientConfigs().configKeys();
} else if (hasUser || hasClientId) {
configKeys = QuotaConfigs.userAndClientQuotaConfigs().configKeys();
} else {
return new ApiError(Errors.INVALID_REQUEST, "Invalid empty client quota entity");
}
Expand All @@ -214,46 +210,64 @@ private ApiError configKeysForEntityType(Map<String, String> entity, Map<String,
return ApiError.NONE;
}

private ApiError validateQuotaKeyValue(Map<String, ConfigDef.ConfigKey> validKeys, String key, Double value) {
// TODO can this validation be shared with alter configs?
static ApiError validateQuotaKeyValue(
Map<String, ConfigDef.ConfigKey> validKeys,
String key,
double value
) {
// Ensure we have an allowed quota key
ConfigDef.ConfigKey configKey = validKeys.get(key);
if (configKey == null) {
return new ApiError(Errors.INVALID_REQUEST, "Invalid configuration key " + key);
}
if (value <= 0.0) {
return new ApiError(Errors.INVALID_REQUEST, "Quota " + key + " must be greater than 0");
}

// Ensure the quota value is valid
switch (configKey.type()) {
case DOUBLE:
break;
return ApiError.NONE;
case SHORT:
if (value > Short.MAX_VALUE) {
return new ApiError(Errors.INVALID_REQUEST,
"Proposed value for " + key + " is too large for a SHORT.");
}
return getErrorForIntegralQuotaValue(value, key);
case INT:
case LONG:
Double epsilon = 1e-6;
Long longValue = Double.valueOf(value + epsilon).longValue();
if (Math.abs(longValue.doubleValue() - value) > epsilon) {
if (value > Integer.MAX_VALUE) {
return new ApiError(Errors.INVALID_REQUEST,
"Proposed value for " + key + " is too large for an INT.");
}
return getErrorForIntegralQuotaValue(value, key);
case LONG: {
if (value > Long.MAX_VALUE) {
return new ApiError(Errors.INVALID_REQUEST,
"Configuration " + key + " must be a Long value");
"Proposed value for " + key + " is too large for a LONG.");
}
break;
return getErrorForIntegralQuotaValue(value, key);
}
default:
return new ApiError(Errors.UNKNOWN_SERVER_ERROR,
"Unexpected config type " + configKey.type() + " should be Long or Double");
}
}

static ApiError getErrorForIntegralQuotaValue(double value, String key) {
double remainder = Math.abs(value % 1.0);
if (remainder > 1e-6) {
return new ApiError(Errors.INVALID_REQUEST, key + " cannot be a fractional value.");
}
return ApiError.NONE;
}

// TODO move this somewhere common?
private boolean isValidIpEntity(String ip) {
if (Objects.nonNull(ip)) {
try {
InetAddress.getByName(ip);
return true;
} catch (UnknownHostException e) {
return false;
}
} else {
static boolean isValidIpEntity(String ip) {
if (ip == null) return true;
try {
InetAddress.getByName(ip);
return true;
} catch (UnknownHostException e) {
return false;
}
}

Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.controller;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.internals.QuotaConfigs;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
Expand All @@ -36,6 +37,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand Down Expand Up @@ -306,4 +308,183 @@ static ClientQuotaEntity userClientEntity(String user, String clientId) {
entries.put(ClientQuotaEntity.CLIENT_ID, clientId);
return new ClientQuotaEntity(entries);
}

@Test
public void testIsValidIpEntityWithNull() {
assertTrue(ClientQuotaControlManager.isValidIpEntity(null));
}

@Test
public void testIsValidIpEntityWithUnresolvableHostname() {
// example.invalid will never be valid, as per RFC 2606.
assertFalse(ClientQuotaControlManager.isValidIpEntity("example.invalid"));
}

@Test
public void testIsValidIpEntityWithLocalhost() {
assertTrue(ClientQuotaControlManager.isValidIpEntity("127.0.0.1"));
}

@Test
public void testConfigKeysForEntityTypeWithUser() {
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.USER),
Arrays.asList(
"producer_byte_rate",
"consumer_byte_rate",
"controller_mutation_rate",
"request_percentage"
));
}

@Test
public void testConfigKeysForEntityTypeWithClientId() {
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.CLIENT_ID),
Arrays.asList(
"producer_byte_rate",
"consumer_byte_rate",
"controller_mutation_rate",
"request_percentage"
));
}

@Test
public void testConfigKeysForEntityTypeWithUserAndClientId() {
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.CLIENT_ID, ClientQuotaEntity.USER),
Arrays.asList(
"producer_byte_rate",
"consumer_byte_rate",
"controller_mutation_rate",
"request_percentage"
));
}

@Test
public void testConfigKeysForEntityTypeWithIp() {
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.IP),
Arrays.asList(
"connection_creation_rate"
));
}

private static Map<String, String> keysToEntity(List<String> entityKeys) {
HashMap<String, String> entity = new HashMap<>();
for (String entityKey : entityKeys) {
if (entityKey.equals(ClientQuotaEntity.IP)) {
entity.put(entityKey, "127.0.0.1");
} else {
entity.put(entityKey, "foo");
}
}
return entity;
}

private static void testConfigKeysForEntityType(
List<String> entityKeys,
List<String> expectedConfigs
) {
HashMap<String, ConfigDef.ConfigKey> output = new HashMap<>();
assertEquals(ApiError.NONE, ClientQuotaControlManager.configKeysForEntityType(
keysToEntity(entityKeys), output));
assertEquals(new HashSet<>(expectedConfigs), output.keySet());
}

@Test
public void testConfigKeysForEmptyEntity() {
testConfigKeysError(Arrays.asList(),
new ApiError(Errors.INVALID_REQUEST, "Invalid empty client quota entity"));
}

@Test
public void testConfigKeysForEntityTypeWithIpAndUser() {
testConfigKeysError(Arrays.asList(ClientQuotaEntity.IP, ClientQuotaEntity.USER),
new ApiError(Errors.INVALID_REQUEST, "Invalid quota entity combination, IP entity should" +
"not be combined with User or ClientId"));
}

@Test
public void testConfigKeysForEntityTypeWithIpAndClientId() {
testConfigKeysError(Arrays.asList(ClientQuotaEntity.IP, ClientQuotaEntity.CLIENT_ID),
new ApiError(Errors.INVALID_REQUEST, "Invalid quota entity combination, IP entity should" +
"not be combined with User or ClientId"));
}

private static void testConfigKeysError(List<String> entityKeys, ApiError expectedError) {
testConfigKeysError(keysToEntity(entityKeys), expectedError);
}

@Test
public void testConfigKeysForUnresolvableIpEntity() {
testConfigKeysError(Collections.singletonMap(ClientQuotaEntity.IP, "example.invalid"),
new ApiError(Errors.INVALID_REQUEST, "example.invalid is not a valid IP or resolvable host."));
}

private static void testConfigKeysError(
Map<String, String> entity,
ApiError expectedError
) {
HashMap<String, ConfigDef.ConfigKey> output = new HashMap<>();
assertEquals(expectedError, ClientQuotaControlManager.configKeysForEntityType(entity, output));
}

private final static HashMap<String, ConfigDef.ConfigKey> VALID_CLIENT_ID_QUOTA_KEYS;

static {
VALID_CLIENT_ID_QUOTA_KEYS = new HashMap<>();
assertEquals(ApiError.NONE, ClientQuotaControlManager.configKeysForEntityType(
keysToEntity(Arrays.asList(ClientQuotaEntity.CLIENT_ID)), VALID_CLIENT_ID_QUOTA_KEYS));
}

@Test
public void testValidateQuotaKeyValueForUnknownQuota() {
assertEquals(new ApiError(Errors.INVALID_REQUEST, "Invalid configuration key foobar"),
ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "foobar", 1.0));
}

@Test
public void testValidateQuotaKeyValueForZeroQuota() {
assertEquals(new ApiError(Errors.INVALID_REQUEST, "Quota producer_byte_rate must be greater than 0"),
ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "producer_byte_rate", 0.0));
}

@Test
public void testValidateQuotaKeyValueForNegativeQuota() {
assertEquals(new ApiError(Errors.INVALID_REQUEST, "Quota consumer_byte_rate must be greater than 0"),
ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", -2.0));
}

@Test
public void testValidateQuotaKeyValueForValidConsumerByteRate() {
assertEquals(ApiError.NONE, ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", 1234.0));
}

@Test
public void testValidateQuotaKeyValueForConsumerByteRateTooLarge() {
assertEquals(new ApiError(Errors.INVALID_REQUEST,
"Proposed value for consumer_byte_rate is too large for a LONG."),
ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", 36893488147419103232.4));
}

@Test
public void testValidateQuotaKeyValueForFractionalConsumerByteRate() {
assertEquals(new ApiError(Errors.INVALID_REQUEST, "consumer_byte_rate cannot be a fractional value."),
ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", 2.245));
}

@Test
public void testValidateQuotaKeyValueForValidConsumerByteRate2() {
assertEquals(ApiError.NONE, ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", 1235.0000001));
}

@Test
public void testValidateQuotaKeyValueForValidRequestPercentage() {
assertEquals(ApiError.NONE, ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "request_percentage", 56.62367));
}
}

0 comments on commit 7049333

Please sign in to comment.