Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka-16540: Update partitions if min isr config is changed. #15702

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.apache.kafka.metadata.Replicas.NONE;
Expand All @@ -42,10 +43,10 @@ public class BrokersToElrs {
/**
* Update our records of a partition's ELR.
*
* @param topicId The topic ID of the partition.
* @param partitionId The partition ID of the partition.
* @param prevElr The previous ELR, or null if the partition is new.
* @param nextElr The new ELR, or null if the partition is being removed.
* @param topicId The topic ID of the partition.
* @param partitionId The partition ID of the partition.
* @param prevElr The previous ELR, or null if the partition is new.
* @param nextElr The new ELR, or null if the partition is being removed.
*/

void update(Uuid topicId, int partitionId, int[] prevElr, int[] nextElr) {
Expand Down Expand Up @@ -159,4 +160,14 @@ BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithBrokerInElr(int brokerId
}
return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false);
}

BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithElr() {
Map<Uuid, int[]> topicMap = new HashMap<>();
for (Map<Uuid, int[]> map : elrMembers.values()) {
if (map != null) {
CalvinConfluent marked this conversation as resolved.
Show resolved Hide resolved
topicMap.putAll(map);
}
}
return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
Expand All @@ -48,6 +50,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG;
Expand All @@ -66,6 +69,7 @@ public class ConfigurationControlManager {
private final TimelineHashMap<ConfigResource, TimelineHashMap<String, String>> configData;
private final Map<String, Object> staticConfig;
private final ConfigResource currentController;
private final MinIsrConfigUpdatePartitionHandler minIsrConfigUpdatePartitionHandler;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe more of a question for someone with more code ownership of the quorum controller code, but I wonder if it would be preferable to handle generating the replication control manager records in the QuorumController.incrementalAlterConfigs. That would also make it a bit easier to handle validateOnly which we are not currently handling.


static class Builder {
private LogContext logContext = null;
Expand All @@ -75,6 +79,7 @@ static class Builder {
private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
private ConfigurationValidator validator = ConfigurationValidator.NO_OP;
private Map<String, Object> staticConfig = Collections.emptyMap();
private MinIsrConfigUpdatePartitionHandler minIsrConfigUpdatePartitionHandler;
private int nodeId = 0;

Builder setLogContext(LogContext logContext) {
Expand Down Expand Up @@ -117,9 +122,19 @@ Builder setNodeId(int nodeId) {
return this;
}

Builder setMinIsrConfigUpdatePartitionHandler(
MinIsrConfigUpdatePartitionHandler minIsrConfigUpdatePartitionHandler
) {
this.minIsrConfigUpdatePartitionHandler = minIsrConfigUpdatePartitionHandler;
return this;
}

ConfigurationControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
if (minIsrConfigUpdatePartitionHandler == null) {
throw new RuntimeException("You must specify MinIsrConfigUpdatePartitionHandler");
}
return new ConfigurationControlManager(
logContext,
snapshotRegistry,
Expand All @@ -128,7 +143,8 @@ ConfigurationControlManager build() {
alterConfigPolicy,
validator,
staticConfig,
nodeId);
nodeId,
minIsrConfigUpdatePartitionHandler);
}
}

Expand All @@ -139,7 +155,9 @@ private ConfigurationControlManager(LogContext logContext,
Optional<AlterConfigPolicy> alterConfigPolicy,
ConfigurationValidator validator,
Map<String, Object> staticConfig,
int nodeId) {
int nodeId,
MinIsrConfigUpdatePartitionHandler minIsrConfigUpdatePartitionHandler
) {
this.log = logContext.logger(ConfigurationControlManager.class);
this.snapshotRegistry = snapshotRegistry;
this.configSchema = configSchema;
Expand All @@ -149,6 +167,7 @@ private ConfigurationControlManager(LogContext logContext,
this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
this.staticConfig = Collections.unmodifiableMap(new HashMap<>(staticConfig));
this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId));
this.minIsrConfigUpdatePartitionHandler = minIsrConfigUpdatePartitionHandler;
}

SnapshotRegistry snapshotRegistry() {
Expand Down Expand Up @@ -260,6 +279,7 @@ private ApiError incrementalAlterConfigResource(
if (error.isFailure()) {
return error;
}
maybeTriggerPartitionUpdateOnMinIsrChange(newRecords);
CalvinConfluent marked this conversation as resolved.
Show resolved Hide resolved
outputRecords.addAll(newRecords);
return ApiError.NONE;
}
Expand Down Expand Up @@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource configResource,
return ApiError.NONE;
}

void maybeTriggerPartitionUpdateOnMinIsrChange(List<ApiMessageAndVersion> records) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess eventually we will probably want some more general system for responding to configuration changes, and not special-case min.isr. However, we don't have to do that in this PR.

I also understand why you had this function receive a list of records here rather than something fancier (easier to integrate into the existing code, and into the two configuration change paths.)

List<ConfigRecord> minIsrRecords = new ArrayList<>();
Map<String, String> topicMap = new HashMap<>();
Map<String, String> configRemovedTopicMap = new HashMap<>();
records.forEach(record -> {
if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) {
ConfigRecord configRecord = (ConfigRecord) record.message();
if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
minIsrRecords.add(configRecord);
if (Type.forId(configRecord.resourceType()) == Type.TOPIC) {
if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value());
else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
}
}
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the behavior if the default broker config for min.insync.replicas is changed?
I am not actually sure how that impacts the min.insync.replicas for existing topics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If min.insync.replicas is not set on the topic config level, the effective min.insync.replicas of a topic will change if default broker config is updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unfortunately we have a 5 level system:

  1. topic configs (highest priority)
  2. node configs (aka "broker" configs, but they also apply to controllers)
  3. cluster configs (aka default configs for the broker resource)
  4. static configuration
  5. static default

The first three levels can change at runtime 🤮


if (minIsrRecords.isEmpty()) return;
if (topicMap.size() == minIsrRecords.size()) {
// If all the min isr config updates are on the topic level, we can trigger a simpler update just on the
// updated topics.
records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
new ArrayList<>(topicMap.keySet()),
topicName -> topicMap.get(topicName))
);
return;
}

// Because it may require multiple layer look up for the min ISR config value. Build a config data copy and apply
// the config updates to it. Use this config copy for the min ISR look up.
Map<ConfigResource, TimelineHashMap<String, String>> configDataCopy = new HashMap<>(configData);
SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new LogContext("dummy-config-update"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but I don't think this works... pendingConfigData will have some of the new changes you made, but not all of the existing changes. So, for example, perhaps we are changing the cluster config for min topic ISR, but the node config for the current controller node is unchanged. It should take priority, but it won't be in here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is not a straightforward change. So the pending config data populated here will be checked together with the existing configs. See how OrderedConfigResolver is used below.

for (ConfigRecord record : minIsrRecords) {
replayInternal(record, configDataCopy, localSnapshotRegistry);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we calling replay here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the implementation challenge part of this PR. To find the effective min ISR value, it requires checking topic config -> dynamic broker config -> default broker config -> ...
Let's say the user updates the default broker config:

  1. All the topics could be affected.
  2. The effective min ISR values should be recalculated.
  3. We need to generate the partition change records along with the config change records, which means the ReplicationControlManager can't use the regular methods for the effective min ISR value. The value should be determined by the config records and the current configs.

I found it easier to make a copy of the configs and apply the min ISR updates on the copy. Then let the ReplicationControlManager check all the partitions with the config copy.

records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
configRemovedTopicMap.size() == minIsrRecords.size() ?
new ArrayList<>(configRemovedTopicMap.keySet()) : Collections.emptyList(),
topicName -> getTopicConfigInternal(topicName, TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, configDataCopy))
);
}

/**
* Determine the result of applying a batch of legacy configuration changes. Note
* that this method does not change the contents of memory. It just generates a
Expand Down Expand Up @@ -375,6 +437,7 @@ private void legacyAlterConfigResource(ConfigResource configResource,
}
outputRecords.addAll(recordsExplicitlyAltered);
outputRecords.addAll(recordsImplicitlyDeleted);
maybeTriggerPartitionUpdateOnMinIsrChange(outputRecords);
outputResults.put(configResource, ApiError.NONE);
}

Expand All @@ -401,27 +464,44 @@ private List<String> getParts(String value, String key, ConfigResource configRes
* @param record The ConfigRecord.
*/
public void replay(ConfigRecord record) {
replayInternal(record, configData, snapshotRegistry);
Type type = Type.forId(record.resourceType());
ConfigResource configResource = new ConfigResource(type, record.resourceName());
if (configSchema.isSensitive(record)) {
log.info("Replayed ConfigRecord for {} which set configuration {} to {}",
configResource, record.name(), Password.HIDDEN);
} else {
log.info("Replayed ConfigRecord for {} which set configuration {} to {}",
configResource, record.name(), record.value());
}
}

/**
* Apply a configuration record to the given config data.
*
* @param record The ConfigRecord.
* @param localConfigData The config data is going to be updated.
* @param localSnapshotRegistry The snapshot registry to use when adding configs.
*/
public void replayInternal(
ConfigRecord record,
Map<ConfigResource, TimelineHashMap<String, String>> localConfigData,
SnapshotRegistry localSnapshotRegistry
) {
Type type = Type.forId(record.resourceType());
ConfigResource configResource = new ConfigResource(type, record.resourceName());
TimelineHashMap<String, String> configs = configData.get(configResource);
TimelineHashMap<String, String> configs = localConfigData.get(configResource);
if (configs == null) {
configs = new TimelineHashMap<>(snapshotRegistry, 0);
configData.put(configResource, configs);
configs = new TimelineHashMap<>(localSnapshotRegistry, 0);
localConfigData.put(configResource, configs);
}
if (record.value() == null) {
configs.remove(record.name());
} else {
configs.put(record.name(), record.value());
}
if (configs.isEmpty()) {
configData.remove(configResource);
}
if (configSchema.isSensitive(record)) {
log.info("Replayed ConfigRecord for {} which set configuration {} to {}",
configResource, record.name(), Password.HIDDEN);
} else {
log.info("Replayed ConfigRecord for {} which set configuration {} to {}",
configResource, record.name(), record.value());
localConfigData.remove(configResource);
}
}

Expand All @@ -443,15 +523,24 @@ Map<String, String> getConfigs(ConfigResource configResource) {
* @param configKey The key for the config.
*/
String getTopicConfig(String topicName, String configKey) throws NoSuchElementException {
Map<String, String> map = configData.get(new ConfigResource(Type.TOPIC, topicName));
if (map == null || !map.containsKey(configKey)) {
Map<String, ConfigEntry> effectiveConfigMap = computeEffectiveTopicConfigs(Collections.emptyMap());
return getTopicConfigInternal(topicName, configKey, configData);
}

String getTopicConfigInternal(
String topicName,
String configKey,
Map<ConfigResource, TimelineHashMap<String, String>> localConfigData
) throws NoSuchElementException {
Map<String, String> topicConfigs = localConfigData.get(new ConfigResource(Type.TOPIC, topicName));
if (topicConfigs == null || !topicConfigs.containsKey(configKey)) {
Map<String, ConfigEntry> effectiveConfigMap =
computeEffectiveTopicConfigsInternal(Collections.emptyMap(), localConfigData);
if (!effectiveConfigMap.containsKey(configKey)) {
return null;
}
return effectiveConfigMap.get(configKey).value();
}
return map.get(configKey);
return topicConfigs.get(configKey);
}

public Map<ConfigResource, ResultOrError<Map<String, String>>> describeConfigs(
Expand Down Expand Up @@ -500,17 +589,28 @@ boolean uncleanLeaderElectionEnabledForTopic(String name) {
}

Map<String, ConfigEntry> computeEffectiveTopicConfigs(Map<String, String> creationConfigs) {
return configSchema.resolveEffectiveTopicConfigs(staticConfig, clusterConfig(),
currentControllerConfig(), creationConfigs);
return computeEffectiveTopicConfigsInternal(creationConfigs, configData);
}

Map<String, ConfigEntry> computeEffectiveTopicConfigsInternal(
Map<String, String> creationConfigs,
Map<ConfigResource, TimelineHashMap<String, String>> localConfigData) {
return configSchema.resolveEffectiveTopicConfigs(staticConfig, clusterConfig(localConfigData),
currentControllerConfig(localConfigData), creationConfigs);
}

Map<String, String> clusterConfig() {
Map<String, String> result = configData.get(DEFAULT_NODE);
Map<String, String> clusterConfig(Map<ConfigResource, TimelineHashMap<String, String>> localConfigData) {
Map<String, String> result = localConfigData.get(DEFAULT_NODE);
return (result == null) ? Collections.emptyMap() : result;
}

Map<String, String> currentControllerConfig() {
Map<String, String> result = configData.get(currentController);
Map<String, String> currentControllerConfig(Map<ConfigResource, TimelineHashMap<String, String>> localConfigData) {
Map<String, String> result = localConfigData.get(currentController);
return (result == null) ? Collections.emptyMap() : result;
}

@FunctionalInterface
interface MinIsrConfigUpdatePartitionHandler {
List<ApiMessageAndVersion> addRecordsForMinIsrUpdate(List<String> topicNames, Function<String, String> getTopicMinIsrConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
Expand Down Expand Up @@ -78,11 +78,11 @@
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
Expand All @@ -95,6 +95,8 @@
import org.apache.kafka.controller.errors.ControllerExceptions;
import org.apache.kafka.controller.errors.EventHandlerExceptionInfo;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
Expand All @@ -106,10 +108,8 @@
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.util.RecordRedactor;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
Expand Down Expand Up @@ -137,8 +137,8 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
Expand Down Expand Up @@ -1834,6 +1834,7 @@ private QuorumController(
setValidator(configurationValidator).
setStaticConfig(staticConfig).
setNodeId(nodeId).
setMinIsrConfigUpdatePartitionHandler(this::maybeTriggerMinIsrConfigUpdate).
build();
this.clientQuotaControlManager = new ClientQuotaControlManager.Builder().
setLogContext(logContext).
Expand Down Expand Up @@ -2360,4 +2361,8 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
void handleUncleanBrokerShutdown(int brokerId, List<ApiMessageAndVersion> records) {
replicationControl.handleBrokerUncleanShutdown(brokerId, records);
}

List<ApiMessageAndVersion> maybeTriggerMinIsrConfigUpdate(List<String> topicNames, Function<String, String> getTopicMinIsrConfig) {
return replicationControl.getPartitionElrUpdatesForConfigChanges(topicNames, getTopicMinIsrConfig);
}
}