Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka-16540: Update partitions if min isr config is changed. #15702

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.apache.kafka.metadata.Replicas.NONE;
Expand All @@ -42,10 +43,10 @@ public class BrokersToElrs {
/**
* Update our records of a partition's ELR.
*
* @param topicId The topic ID of the partition.
* @param partitionId The partition ID of the partition.
* @param prevElr The previous ELR, or null if the partition is new.
* @param nextElr The new ELR, or null if the partition is being removed.
* @param topicId The topic ID of the partition.
* @param partitionId The partition ID of the partition.
* @param prevElr The previous ELR, or null if the partition is new.
* @param nextElr The new ELR, or null if the partition is being removed.
*/

void update(Uuid topicId, int partitionId, int[] prevElr, int[] nextElr) {
Expand Down Expand Up @@ -159,4 +160,12 @@ BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithBrokerInElr(int brokerId
}
return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false);
}

BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithElr() {
Map<Uuid, int[]> topicMap = new HashMap<>();
for (Map<Uuid, int[]> map : elrMembers.values()) {
topicMap.putAll(map);
}
return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.KafkaConfigSchema.OrderedConfigResolver;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.AlterConfigPolicy;
Expand All @@ -37,6 +40,7 @@
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -48,6 +52,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG;
Expand All @@ -66,6 +71,7 @@ public class ConfigurationControlManager {
private final TimelineHashMap<ConfigResource, TimelineHashMap<String, String>> configData;
private final Map<String, Object> staticConfig;
private final ConfigResource currentController;
private final MinIsrConfigUpdatePartitionHandler minIsrConfigUpdatePartitionHandler;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe more of a question for someone with more code ownership of the quorum controller code, but I wonder if it would be preferable to handle generating the replication control manager records in the QuorumController.incrementalAlterConfigs. That would also make it a bit easier to handle validateOnly which we are not currently handling.


static class Builder {
private LogContext logContext = null;
Expand All @@ -75,6 +81,7 @@ static class Builder {
private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
private ConfigurationValidator validator = ConfigurationValidator.NO_OP;
private Map<String, Object> staticConfig = Collections.emptyMap();
private MinIsrConfigUpdatePartitionHandler minIsrConfigUpdatePartitionHandler;
private int nodeId = 0;

Builder setLogContext(LogContext logContext) {
Expand Down Expand Up @@ -117,9 +124,19 @@ Builder setNodeId(int nodeId) {
return this;
}

Builder setMinIsrConfigUpdatePartitionHandler(
MinIsrConfigUpdatePartitionHandler minIsrConfigUpdatePartitionHandler
) {
this.minIsrConfigUpdatePartitionHandler = minIsrConfigUpdatePartitionHandler;
return this;
}

ConfigurationControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
if (minIsrConfigUpdatePartitionHandler == null) {
throw new RuntimeException("You must specify MinIsrConfigUpdatePartitionHandler");
}
return new ConfigurationControlManager(
logContext,
snapshotRegistry,
Expand All @@ -128,7 +145,8 @@ ConfigurationControlManager build() {
alterConfigPolicy,
validator,
staticConfig,
nodeId);
nodeId,
minIsrConfigUpdatePartitionHandler);
}
}

Expand All @@ -139,7 +157,9 @@ private ConfigurationControlManager(LogContext logContext,
Optional<AlterConfigPolicy> alterConfigPolicy,
ConfigurationValidator validator,
Map<String, Object> staticConfig,
int nodeId) {
int nodeId,
MinIsrConfigUpdatePartitionHandler minIsrConfigUpdatePartitionHandler
) {
this.log = logContext.logger(ConfigurationControlManager.class);
this.snapshotRegistry = snapshotRegistry;
this.configSchema = configSchema;
Expand All @@ -149,6 +169,7 @@ private ConfigurationControlManager(LogContext logContext,
this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
this.staticConfig = Collections.unmodifiableMap(new HashMap<>(staticConfig));
this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId));
this.minIsrConfigUpdatePartitionHandler = minIsrConfigUpdatePartitionHandler;
}

SnapshotRegistry snapshotRegistry() {
Expand Down Expand Up @@ -260,6 +281,7 @@ private ApiError incrementalAlterConfigResource(
if (error.isFailure()) {
return error;
}
maybeTriggerPartitionUpdateOnMinIsrChange(newRecords);
CalvinConfluent marked this conversation as resolved.
Show resolved Hide resolved
outputRecords.addAll(newRecords);
return ApiError.NONE;
}
Expand Down Expand Up @@ -308,6 +330,54 @@ private ApiError validateAlterConfig(ConfigResource configResource,
return ApiError.NONE;
}

void maybeTriggerPartitionUpdateOnMinIsrChange(List<ApiMessageAndVersion> records) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess eventually we will probably want some more general system for responding to configuration changes, and not special-case min.isr. However, we don't have to do that in this PR.

I also understand why you had this function receive a list of records here rather than something fancier (easier to integrate into the existing code, and into the two configuration change paths.)

List<ConfigRecord> minIsrRecords = new ArrayList<>();
Map<String, String> topicToMinIsrValueMap = new HashMap<>();
Map<String, String> configRemovedTopicMap = new HashMap<>();
records.forEach(record -> {
if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) {
ConfigRecord configRecord = (ConfigRecord) record.message();
if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
minIsrRecords.add(configRecord);
if (Type.forId(configRecord.resourceType()) == Type.TOPIC) {
if (configRecord.value() != null) topicToMinIsrValueMap.put(configRecord.resourceName(), configRecord.value());
else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
}
}
}
});

if (minIsrRecords.isEmpty()) return;
if (topicToMinIsrValueMap.size() == minIsrRecords.size()) {
// If all the min isr config updates are on the topic level, we can trigger a simpler update just on the
// updated topics.
records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
new ArrayList<>(topicToMinIsrValueMap.keySet()),
topicName -> topicToMinIsrValueMap.get(topicName))
);
return;
}

// Because it may require multiple layer look up for the min ISR config value. Build a config data copy and apply
// the config updates to it. Use this config copy for the min ISR look up.
Map<ConfigResource, Map<String, String>> pendingConfigData = new HashMap<>();

for (ConfigRecord record : minIsrRecords) {
replayForPendingConfig(record, pendingConfigData);
}

ArrayList<String> topicList = new ArrayList<>();
// If all the updates are on the Topic level, we can avoid perform a full scan of the partitions.
if (configRemovedTopicMap.size() + topicToMinIsrValueMap.size() == minIsrRecords.size()) {
topicList.addAll(configRemovedTopicMap.keySet());
topicList.addAll(topicToMinIsrValueMap.keySet());
}
records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
topicList,
topicName -> getTopicConfigWithPendingChange(topicName, TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, pendingConfigData))
);
}

/**
* Determine the result of applying a batch of legacy configuration changes. Note
* that this method does not change the contents of memory. It just generates a
Expand Down Expand Up @@ -375,6 +445,7 @@ private void legacyAlterConfigResource(ConfigResource configResource,
}
outputRecords.addAll(recordsExplicitlyAltered);
outputRecords.addAll(recordsImplicitlyDeleted);
maybeTriggerPartitionUpdateOnMinIsrChange(outputRecords);
outputResults.put(configResource, ApiError.NONE);
}

Expand Down Expand Up @@ -425,6 +496,35 @@ public void replay(ConfigRecord record) {
}
}

/**
* Apply a configuration record to the given config data. Note that, it will store null for the config to be removed.
*
* @param record The ConfigRecord.
* @param localConfigData The config data is going to be updated.
*/
public void replayForPendingConfig(
ConfigRecord record,
Map<ConfigResource, Map<String, String>> localConfigData
) {
Type type = Type.forId(record.resourceType());
ConfigResource configResource = new ConfigResource(type, record.resourceName());
Map<String, String> configs = localConfigData.get(configResource);
if (configs == null) {
configs = new HashMap<>();
localConfigData.put(configResource, configs);
}

// If the record removes a config, we keep an empty value here to indicate the value is removed.
if (record.value() == null) {
configs.put(record.name(), null);
} else {
configs.put(record.name(), record.value());
}
if (configs.isEmpty()) {
localConfigData.remove(configResource);
}
}

// VisibleForTesting
Map<String, String> getConfigs(ConfigResource configResource) {
Map<String, String> map = configData.get(configResource);
Expand Down Expand Up @@ -454,6 +554,37 @@ String getTopicConfig(String topicName, String configKey) throws NoSuchElementEx
return map.get(configKey);
}

/**
* Get the config value for the give topic and give config key. Also, it will search the configs in the pending
* config data first.
* If the config value is not found, return null.
*
* @param topicName The topic name for the config.
* @param configKey The key for the config.
* @param pendingConfigData The configs which is going to be applied. It should have the higher priority than
* the current configs.
*/
String getTopicConfigWithPendingChange(
String topicName,
String configKey,
Map<ConfigResource, Map<String, String>> pendingConfigData
) throws NoSuchElementException {
Map<String, String> pendingTopicConfigs =
pendingConfigData.getOrDefault(new ConfigResource(Type.TOPIC, topicName), Collections.emptyMap());
Map<String, String> currentTopicConfigs = configData.get(new ConfigResource(Type.TOPIC, topicName));
if (currentTopicConfigs == null) currentTopicConfigs = Collections.emptyMap();
OrderedConfigResolver configResolver = new OrderedConfigResolver(Arrays.asList(pendingTopicConfigs, currentTopicConfigs));

if (!configResolver.containsKey(configKey)) {
Map<String, ConfigEntry> effectiveConfigMap = computeEffectiveTopicConfigsWithPendingChange(pendingConfigData);
if (!effectiveConfigMap.containsKey(configKey)) {
return null;
}
return effectiveConfigMap.get(configKey).value();
}
return (String) configResolver.get(configKey);
}

public Map<ConfigResource, ResultOrError<Map<String, String>>> describeConfigs(
long lastCommittedOffset, Map<ConfigResource, Collection<String>> resources) {
Map<ConfigResource, ResultOrError<Map<String, String>>> results = new HashMap<>();
Expand Down Expand Up @@ -499,9 +630,26 @@ boolean uncleanLeaderElectionEnabledForTopic(String name) {
return false; // TODO: support configuring unclean leader election.
}

Map<String, ConfigEntry> computeEffectiveTopicConfigsWithPendingChange(
Map<ConfigResource, Map<String, String>> pendingConfigData
) {
Map<String, String> pendingClusterConfig =
pendingConfigData.containsKey(DEFAULT_NODE) ? pendingConfigData.get(DEFAULT_NODE) : Collections.emptyMap();
Map<String, String> pendingControllerConfig =
pendingConfigData.containsKey(currentController) ? pendingConfigData.get(currentController) : Collections.emptyMap();
return configSchema.resolveEffectiveTopicConfigs(
new OrderedConfigResolver(staticConfig),
new OrderedConfigResolver(Arrays.asList(pendingClusterConfig, clusterConfig())),
new OrderedConfigResolver(Arrays.asList(pendingControllerConfig, currentControllerConfig())),
new OrderedConfigResolver(Collections.emptyMap()));
}

Map<String, ConfigEntry> computeEffectiveTopicConfigs(Map<String, String> creationConfigs) {
return configSchema.resolveEffectiveTopicConfigs(staticConfig, clusterConfig(),
currentControllerConfig(), creationConfigs);
return configSchema.resolveEffectiveTopicConfigs(
new OrderedConfigResolver(staticConfig),
new OrderedConfigResolver(clusterConfig()),
new OrderedConfigResolver(currentControllerConfig()),
new OrderedConfigResolver(creationConfigs));
}

Map<String, String> clusterConfig() {
Expand All @@ -513,4 +661,9 @@ Map<String, String> currentControllerConfig() {
Map<String, String> result = configData.get(currentController);
return (result == null) ? Collections.emptyMap() : result;
}

@FunctionalInterface
interface MinIsrConfigUpdatePartitionHandler {
List<ApiMessageAndVersion> addRecordsForMinIsrUpdate(List<String> topicNames, Function<String, String> getTopicMinIsrConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
Expand Down Expand Up @@ -78,11 +78,11 @@
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
Expand All @@ -95,6 +95,8 @@
import org.apache.kafka.controller.errors.ControllerExceptions;
import org.apache.kafka.controller.errors.EventHandlerExceptionInfo;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
Expand All @@ -106,10 +108,8 @@
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.util.RecordRedactor;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
Expand Down Expand Up @@ -137,8 +137,8 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
Expand Down Expand Up @@ -1834,6 +1834,7 @@ private QuorumController(
setValidator(configurationValidator).
setStaticConfig(staticConfig).
setNodeId(nodeId).
setMinIsrConfigUpdatePartitionHandler(this::maybeTriggerMinIsrConfigUpdate).
build();
this.clientQuotaControlManager = new ClientQuotaControlManager.Builder().
setLogContext(logContext).
Expand Down Expand Up @@ -2360,4 +2361,8 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
void handleUncleanBrokerShutdown(int brokerId, List<ApiMessageAndVersion> records) {
replicationControl.handleBrokerUncleanShutdown(brokerId, records);
}

List<ApiMessageAndVersion> maybeTriggerMinIsrConfigUpdate(List<String> topicNames, Function<String, String> getTopicMinIsrConfig) {
return replicationControl.getPartitionElrUpdatesForConfigChanges(topicNames, getTopicMinIsrConfig);
}
}