Skip to content

Commit

Permalink
KAFKA-14859: SCRAM ZK to KRaft migration with dual write (#13628)
Browse files Browse the repository at this point in the history
Handle migrating SCRAM records in ZK when migrating from ZK to KRaft.

This includes handling writing back SCRAM records to ZK while in dual write mode where metadata updates are written to both the KRaft metadata log and to ZK. This allows for rollback of migration to include SCRAM metadata changes.

Reviewers: David Arthur <mumrah@gmail.com>
  • Loading branch information
pprovenzano committed May 1, 2023
1 parent 64ebbc5 commit e299423
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 8 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control-metadata.xml
Expand Up @@ -131,6 +131,7 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.common.requests" />
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/kafka/zk/ZkMigrationClient.scala
Expand Up @@ -24,6 +24,7 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, ZkAdminM
import kafka.utils.{Logging, PasswordEncoder}
import kafka.zk.TopicZNode.TopicIdReplicaAssignment
import kafka.zookeeper._
import org.apache.kafka.clients.admin.ScramMechanism
import org.apache.kafka.common.acl.AccessControlEntry
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.ControllerMovedException
Expand All @@ -32,6 +33,7 @@ import org.apache.kafka.common.metadata._
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState}
import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
Expand Down Expand Up @@ -211,6 +213,20 @@ class ZkMigrationClient(
adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) =>
val entity = new EntityData().setEntityType(entityType).setEntityName(name)
val batch = new util.ArrayList[ApiMessageAndVersion]()
ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism =>
val propertyValue = props.getProperty(mechanism.mechanismName)
if (propertyValue != null) {
val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue)
batch.add(new ApiMessageAndVersion(new UserScramCredentialRecord()
.setName(name)
.setMechanism(mechanism.`type`)
.setSalt(scramCredentials.salt)
.setStoredKey(scramCredentials.storedKey)
.setServerKey(scramCredentials.serverKey)
.setIterations(scramCredentials.iterations), 0.toShort))
props.remove(mechanism.mechanismName)
}
}
ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
.setEntity(List(entity).asJava)
Expand Down Expand Up @@ -470,6 +486,7 @@ class ZkMigrationClient(
override def writeClientQuotas(
entity: util.Map[String, String],
quotas: util.Map[String, java.lang.Double],
scram: util.Map[String, String],
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
val entityMap = entity.asScala
Expand All @@ -478,6 +495,7 @@ class ZkMigrationClient(
val hasIp = entityMap.contains(ClientQuotaEntity.IP)
val props = new Properties()
// We store client quota values as strings in the ZK JSON
scram.forEach { case (key, value) => props.put(key, value.toString) }
quotas.forEach { case (key, value) => props.put(key, value.toString) }
val (configType, path) = if (hasUser && !hasClient) {
(Some(ConfigType.User), Some(entityMap(ClientQuotaEntity.USER)))
Expand Down
38 changes: 38 additions & 0 deletions core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
Expand Up @@ -32,11 +32,14 @@ import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.metadata.{AccessControlEntryRecord, ConfigRecord, MetadataRecordType, ProducerIdsRecord}
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.util.MockRandom
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}

Expand Down Expand Up @@ -217,11 +220,13 @@ class ZkMigrationClientTest extends QuorumTestHarness {
migrationState: ZkMigrationLeadershipState,
entity: Map[String, String],
quotas: Map[String, java.lang.Double],
scram: Map[String, String],
zkEntityType: String,
zkEntityName: String): ZkMigrationLeadershipState = {
val nextMigrationState = migrationClient.writeClientQuotas(
entity.asJava,
quotas.asJava,
scram.asJava,
migrationState)
val newProps = ZkAdminManager.clientQuotaPropsToDoubleMap(
adminZkClient.fetchEntityConfig(zkEntityType, zkEntityName).asScala)
Expand All @@ -241,24 +246,28 @@ class ZkMigrationClientTest extends QuorumTestHarness {
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ClientQuotaEntity.USER -> "user1"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0),
Map.empty,
ConfigType.User, "user1")
assertEquals(1, migrationState.migrationZkVersion())

migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ClientQuotaEntity.USER -> "user1"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0),
Map.empty,
ConfigType.User, "user1")
assertEquals(2, migrationState.migrationZkVersion())

migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ClientQuotaEntity.USER -> "user1"),
Map.empty,
Map.empty,
ConfigType.User, "user1")
assertEquals(3, migrationState.migrationZkVersion())

migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ClientQuotaEntity.USER -> "user1"),
Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
Map.empty,
ConfigType.User, "user1")
assertEquals(4, migrationState.migrationZkVersion())
}
Expand All @@ -269,18 +278,47 @@ class ZkMigrationClientTest extends QuorumTestHarness {
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ClientQuotaEntity.USER -> "user2"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
Map.empty,
ConfigType.User, "user2")

assertEquals(1, migrationState.migrationZkVersion())

migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ClientQuotaEntity.USER -> "user2", ClientQuotaEntity.CLIENT_ID -> "clientA"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
Map.empty,
ConfigType.User, "user2/clients/clientA")

assertEquals(2, migrationState.migrationZkVersion())
}

@Test
def testScram(): Unit = {
val random = new MockRandom()
def randomBuffer(random: MockRandom, length: Int): Array[Byte] = {
val buf = new Array[Byte](length)
random.nextBytes(buf)
buf
}
val scramCredential = new ScramCredential(
randomBuffer(random, 1024),
randomBuffer(random, 1024),
randomBuffer(random, 1024),
4096)

val props = new Properties()
props.put("SCRAM-SHA-256", ScramCredentialUtils.credentialToString(scramCredential))
adminZkClient.changeConfigs(ConfigType.User, "alice", props)

val brokers = new java.util.ArrayList[Integer]()
val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()

migrationClient.readAllMetadata(batch => batches.add(batch), brokerId => brokers.add(brokerId))
assertEquals(0, brokers.size())
assertEquals(1, batches.size())
assertEquals(1, batches.get(0).size)
}

@Test
def testClaimAbsentController(): Unit = {
assertEquals(0, migrationState.migrationZkVersion())
Expand Down
Expand Up @@ -19,9 +19,11 @@
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
Expand All @@ -31,6 +33,7 @@
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.ScramCredentialData;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
Expand Down Expand Up @@ -646,19 +649,57 @@ public void run() throws Exception {
});
}

// For configs and client quotas, we need to send all of the data to the ZK client since we persist
// everything for a given entity in a single ZK node.
// For configs and client quotas, we need to send all of the data to the ZK
// client since we persist everything for a given entity in a single ZK node.
if (delta.configsDelta() != null) {
delta.configsDelta().changes().forEach((configResource, configDelta) ->
apply("Updating config resource " + configResource, migrationState ->
zkMigrationClient.writeConfigs(configResource, image.configs().configMapForResource(configResource), migrationState)));
}

if (delta.clientQuotasDelta() != null) {
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
apply("Updating client quota " + clientQuotaEntity, migrationState ->
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, migrationState));
if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) {
// A list of users with scram or quota changes
HashSet<String> users = new HashSet<String>();

// Populate list with users with scram changes
if (delta.scramDelta() != null) {
delta.scramDelta().changes().forEach((scramMechanism, changes) -> {
changes.forEach((userName, changeOpt) -> users.add(userName));
});
}

// Populate list with users with quota changes
// and apply quota changes to all non user quota changes
if (delta.clientQuotasDelta() != null) {
Map<String, String> scramMap = new HashMap<String, String>();
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {

if ((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
(!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {
String userName = clientQuotaEntity.entries().get(ClientQuotaEntity.USER);
// Add clientQuotaEntity to list to process at the end
users.add(userName);
} else {
Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
apply("Updating client quota " + clientQuotaEntity, migrationState ->
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, scramMap, migrationState));
}
});
}
// Update user scram and quota data for each user with changes in either.
users.forEach(userName -> {
Map<String, String> userScramMap = getScramCredentialStringsForUser(userName);
ClientQuotaEntity clientQuotaEntity = new
ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName));
if (image.clientQuotas() == null) {
Map<String, Double> quotaMap = new HashMap<String, Double>();
apply("Updating client quota " + clientQuotaEntity, migrationState ->
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState));
} else {
Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
apply("Updating client quota " + clientQuotaEntity, migrationState ->
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState));
}
});
}

Expand Down Expand Up @@ -712,6 +753,21 @@ public void run() throws Exception {
completionHandler.accept(null);
}

private Map<String, String> getScramCredentialStringsForUser(String userName) {
Map<String, String> userScramCredentialStrings = new HashMap<String, String>();
if (image.scram() != null) {
image.scram().mechanisms().forEach((scramMechanism, scramMechanismMap) -> {
ScramCredentialData scramCredentialData = scramMechanismMap.get(userName);
if (scramCredentialData != null) {
userScramCredentialStrings.put(scramMechanism.mechanismName(),
ScramCredentialUtils.credentialToString(
scramCredentialData.toCredential(scramMechanism)));
}
});
}
return userScramCredentialStrings;
}

private void addStandardAclToMap(Map<ResourcePattern, List<AccessControlEntry>> aclMap, StandardAcl acl) {
ResourcePattern resource = new ResourcePattern(acl.resourceType(), acl.resourceName(), acl.patternType());
aclMap.computeIfAbsent(resource, __ -> new ArrayList<>()).add(
Expand Down
Expand Up @@ -96,6 +96,7 @@ ZkMigrationLeadershipState writeConfigs(
ZkMigrationLeadershipState writeClientQuotas(
Map<String, String> clientQuotaEntity,
Map<String, Double> quotas,
Map<String, String> scram,
ZkMigrationLeadershipState state
);

Expand Down
Expand Up @@ -183,6 +183,7 @@ public ZkMigrationLeadershipState writeConfigs(
public ZkMigrationLeadershipState writeClientQuotas(
Map<String, String> clientQuotaEntity,
Map<String, Double> quotas,
Map<String, String> scram,
ZkMigrationLeadershipState state
) {
this.state = state;
Expand Down Expand Up @@ -533,4 +534,5 @@ public void testSkipWaitForBrokersInDualWrite() throws Exception {
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
}
}
}
}

0 comments on commit e299423

Please sign in to comment.