From 31a6478194aea5e4f93daebfcfcdfe3117da237b Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Fri, 7 Apr 2023 08:29:28 -0400 Subject: [PATCH 01/12] Migration without dual write --- .../main/scala/kafka/zk/ZkMigrationClient.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala index 5d2ba5c7a17f..461a790305b8 100644 --- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala @@ -24,6 +24,7 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, ZkAdminM import kafka.utils.{Logging, PasswordEncoder} import kafka.zk.TopicZNode.TopicIdReplicaAssignment import kafka.zookeeper._ +import org.apache.kafka.clients.admin.ScramMechanism import org.apache.kafka.common.acl.AccessControlEntry import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.ControllerMovedException @@ -32,6 +33,7 @@ import org.apache.kafka.common.metadata._ import org.apache.kafka.common.quota.ClientQuotaEntity import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration} import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState} import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock} @@ -211,6 +213,21 @@ class ZkMigrationClient( adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) => val entity = new EntityData().setEntityType(entityType).setEntityName(name) val batch = new util.ArrayList[ApiMessageAndVersion]() + ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism => + val propertyValue = props.getProperty(mechanism.mechanismName) + if (propertyValue != null) { + println(s"Found ${propertyValue} for Key:${mechanism.mechanismName}") + val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue) + batch.add(new ApiMessageAndVersion(new UserScramCredentialRecord() + .setName(name) + .setMechanism(mechanism.`type`) + .setSalt(scramCredentials.salt) + .setStoredKey(scramCredentials.storedKey) + .setServerKey(scramCredentials.serverKey) + .setIterations(scramCredentials.iterations), 0.toShort)) + props.remove(mechanism.mechanismName) + } + } ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) => batch.add(new ApiMessageAndVersion(new ClientQuotaRecord() .setEntity(List(entity).asJava) From c6de035862c08de64f8867cd0623de07b6772e54 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Wed, 19 Apr 2023 10:25:01 -0400 Subject: [PATCH 02/12] Simple migration test for SCRAM --- .../unit/kafka/zk/ZkMigrationClientTest.scala | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala index 1787791448c3..129c99659ba9 100644 --- a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala @@ -32,11 +32,14 @@ import org.apache.kafka.common.errors.ControllerMovedException import org.apache.kafka.common.metadata.{AccessControlEntryRecord, ConfigRecord, MetadataRecordType, ProducerIdsRecord} import org.apache.kafka.common.quota.ClientQuotaEntity import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType} +import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils +import org.apache.kafka.common.security.scram.ScramCredential import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{SecurityUtils, Time} import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration} import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState import org.apache.kafka.server.common.ApiMessageAndVersion +import org.apache.kafka.server.util.MockRandom import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail} import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} @@ -281,6 +284,34 @@ class ZkMigrationClientTest extends QuorumTestHarness { assertEquals(2, migrationState.migrationZkVersion()) } + @Test + def testScram(): Unit = { + val random = new MockRandom() + def randomBuffer(random: MockRandom, length: Int): Array[Byte] = { + val buf = new Array[Byte](length) + random.nextBytes(buf) + buf + } + val scramCredential = new ScramCredential( + randomBuffer(random, 1024), + randomBuffer(random, 1024), + randomBuffer(random, 1024), + 4096) + + val props = new Properties() + props.put("SCRAM-SHA-256", ScramCredentialUtils.credentialToString(scramCredential)) + adminZkClient.changeConfigs(ConfigType.User, "alice", props) + + val brokers = new java.util.ArrayList[Integer]() + val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]() + + migrationClient.readAllMetadata(batch => batches.add(batch), brokerId => brokers.add(brokerId)) + assertEquals(0, brokers.size()) + assertEquals(1, batches.size()) + assertEquals(1, batches.get(0).size) + // assertEquals(50, migrationState.migrationZkVersion()) + } + @Test def testClaimAbsentController(): Unit = { assertEquals(0, migrationState.migrationZkVersion()) From ff6e498feb5c16060ded2f1b5e2c805bc60e4c60 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Wed, 19 Apr 2023 10:26:45 -0400 Subject: [PATCH 03/12] Remove extra line --- core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala index 129c99659ba9..c296936da694 100644 --- a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala @@ -309,7 +309,6 @@ class ZkMigrationClientTest extends QuorumTestHarness { assertEquals(0, brokers.size()) assertEquals(1, batches.size()) assertEquals(1, batches.get(0).size) - // assertEquals(50, migrationState.migrationZkVersion()) } @Test From e3abb64856853e6052af8c62abbee43534595590 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Wed, 19 Apr 2023 10:28:04 -0400 Subject: [PATCH 04/12] Remove extra println --- core/src/main/scala/kafka/zk/ZkMigrationClient.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala index 461a790305b8..c3c8c00b8cad 100644 --- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala @@ -216,7 +216,6 @@ class ZkMigrationClient( ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism => val propertyValue = props.getProperty(mechanism.mechanismName) if (propertyValue != null) { - println(s"Found ${propertyValue} for Key:${mechanism.mechanismName}") val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue) batch.add(new ApiMessageAndVersion(new UserScramCredentialRecord() .setName(name) From efee0c45b5e0a4e5d808cae8bf39e6f9c271c629 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Sun, 23 Apr 2023 22:16:37 -0400 Subject: [PATCH 05/12] Basic SCRAM migration with dual write works. --- .../scala/kafka/zk/ZkMigrationClient.scala | 2 + .../unit/kafka/zk/ZkMigrationClientTest.scala | 8 ++ .../migration/KRaftMigrationDriver.java | 75 +++++++++++++++++-- .../metadata/migration/MigrationClient.java | 1 + .../migration/KRaftMigrationDriverTest.java | 3 +- 5 files changed, 81 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala index c3c8c00b8cad..54e0170a71fa 100644 --- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala @@ -486,6 +486,7 @@ class ZkMigrationClient( override def writeClientQuotas( entity: util.Map[String, String], quotas: util.Map[String, java.lang.Double], + scram: util.Map[String, String], state: ZkMigrationLeadershipState ): ZkMigrationLeadershipState = wrapZkException { val entityMap = entity.asScala @@ -494,6 +495,7 @@ class ZkMigrationClient( val hasIp = entityMap.contains(ClientQuotaEntity.IP) val props = new Properties() // We store client quota values as strings in the ZK JSON + scram.forEach { case (key, value) => props.put(key, value.toString) } quotas.forEach { case (key, value) => props.put(key, value.toString) } val (configType, path) = if (hasUser && !hasClient) { (Some(ConfigType.User), Some(entityMap(ClientQuotaEntity.USER))) diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala index c296936da694..8d65004b6e43 100644 --- a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala @@ -220,11 +220,13 @@ class ZkMigrationClientTest extends QuorumTestHarness { migrationState: ZkMigrationLeadershipState, entity: Map[String, String], quotas: Map[String, java.lang.Double], + scram: Map[String, String], zkEntityType: String, zkEntityName: String): ZkMigrationLeadershipState = { val nextMigrationState = migrationClient.writeClientQuotas( entity.asJava, quotas.asJava, + scram.asJava, migrationState) val newProps = ZkAdminManager.clientQuotaPropsToDoubleMap( adminZkClient.fetchEntityConfig(zkEntityType, zkEntityName).asScala) @@ -244,24 +246,28 @@ class ZkMigrationClientTest extends QuorumTestHarness { migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState, Map(ClientQuotaEntity.USER -> "user1"), Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0), + Map.empty, ConfigType.User, "user1") assertEquals(1, migrationState.migrationZkVersion()) migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState, Map(ClientQuotaEntity.USER -> "user1"), Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0), + Map.empty, ConfigType.User, "user1") assertEquals(2, migrationState.migrationZkVersion()) migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState, Map(ClientQuotaEntity.USER -> "user1"), Map.empty, + Map.empty, ConfigType.User, "user1") assertEquals(3, migrationState.migrationZkVersion()) migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState, Map(ClientQuotaEntity.USER -> "user1"), Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0), + Map.empty, ConfigType.User, "user1") assertEquals(4, migrationState.migrationZkVersion()) } @@ -272,6 +278,7 @@ class ZkMigrationClientTest extends QuorumTestHarness { migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState, Map(ClientQuotaEntity.USER -> "user2"), Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0), + Map.empty, ConfigType.User, "user2") assertEquals(1, migrationState.migrationZkVersion()) @@ -279,6 +286,7 @@ class ZkMigrationClientTest extends QuorumTestHarness { migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState, Map(ClientQuotaEntity.USER -> "user2", ClientQuotaEntity.CLIENT_ID -> "clientA"), Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0), + Map.empty, ConfigType.User, "user2/clients/clientA") assertEquals(2, migrationState.migrationZkVersion()) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 44a67b222c87..a95b2c3c1c05 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -19,9 +19,11 @@ import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils; import org.apache.kafka.controller.QuorumFeatures; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -31,6 +33,7 @@ import org.apache.kafka.image.publisher.MetadataPublisher; import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.metadata.authorizer.StandardAcl; +import org.apache.kafka.metadata.ScramCredentialData; import org.apache.kafka.queue.EventQueue; import org.apache.kafka.queue.KafkaEventQueue; import org.apache.kafka.raft.LeaderAndEpoch; @@ -646,19 +649,59 @@ public void run() throws Exception { }); } - // For configs and client quotas, we need to send all of the data to the ZK client since we persist - // everything for a given entity in a single ZK node. + // For configs and client quotas, we need to send all of the data to the ZK + // client since we persist everything for a given entity in a single ZK node. if (delta.configsDelta() != null) { delta.configsDelta().changes().forEach((configResource, configDelta) -> apply("Updating config resource " + configResource, migrationState -> zkMigrationClient.writeConfigs(configResource, image.configs().configMapForResource(configResource), migrationState))); } - if (delta.clientQuotasDelta() != null) { - delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> { - Map quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap(); - apply("Updating client quota " + clientQuotaEntity, migrationState -> - zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, migrationState)); + if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) { + + // A list of users with scram or quota changes + HashSet users = new HashSet(); + + // Populate list with users with scram changes + if (delta.scramDelta() != null) { + delta.scramDelta().changes().forEach((scramMechanism, changes) -> { + changes.forEach((userName, changeOpt) -> users.add(userName)); + }); + } + + // Populate list with users with quota changes + // and apply quota changes to all non user quota changes + if (delta.clientQuotasDelta() != null) { + Map scramMap = new HashMap(); + delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> { + + if ((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) && + (!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) { + String userName = clientQuotaEntity.entries().get(ClientQuotaEntity.USER); + // Add clientQuotaEntity to list to process at the end + users.add(userName); + } else { + Map quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap(); + apply("Updating client quota " + clientQuotaEntity, migrationState -> + zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, scramMap, migrationState)); + } + }); + } + // Updateuser scram and quota data for each user with changes in either. + users.forEach(userName -> { + System.out.println("Applying changes for user : " + userName); + Map userscramMap = getScramCredentialStringsForUser(userName); + ClientQuotaEntity clientQuotaEntity = new + ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName)); + if (image.clientQuotas() != null) { + Map quotaMap = new HashMap(); + apply("Updating client quota " + clientQuotaEntity, migrationState -> + zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userscramMap, migrationState)); + } else { + Map quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap(); + apply("Updating client quota " + clientQuotaEntity, migrationState -> + zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userscramMap, migrationState)); + } }); } @@ -712,6 +755,24 @@ public void run() throws Exception { completionHandler.accept(null); } + private Map getScramCredentialStringsForUser(String userName) { + Map userScramCredentialStrings = new HashMap(); + if (image.scram() != null) { + image.scram().mechanisms().forEach((scramMechanism, scramMechanismMap) -> { + ScramCredentialData scramCredentialData = scramMechanismMap.get(userName); + if (scramCredentialData != null) { + userScramCredentialStrings.put(scramMechanism.mechanismName(), + ScramCredentialUtils.credentialToString( + scramCredentialData.toCredential(scramMechanism))); + System.out.println(" Changes are: " + + ScramCredentialUtils.credentialToString( + scramCredentialData.toCredential(scramMechanism))); + } + }); + } + return userScramCredentialStrings; + } + private void addStandardAclToMap(Map> aclMap, StandardAcl acl) { ResourcePattern resource = new ResourcePattern(acl.resourceType(), acl.resourceName(), acl.patternType()); aclMap.computeIfAbsent(resource, __ -> new ArrayList<>()).add( diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java index 3af3cd10959e..8ed1cd2e7ae8 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java @@ -96,6 +96,7 @@ ZkMigrationLeadershipState writeConfigs( ZkMigrationLeadershipState writeClientQuotas( Map clientQuotaEntity, Map quotas, + Map scram, ZkMigrationLeadershipState state ); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index 4bf58f99cc2b..09e15197cbfe 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -183,6 +183,7 @@ public ZkMigrationLeadershipState writeConfigs( public ZkMigrationLeadershipState writeClientQuotas( Map clientQuotaEntity, Map quotas, + Map scram, ZkMigrationLeadershipState state ) { this.state = state; @@ -533,4 +534,4 @@ public void testSkipWaitForBrokersInDualWrite() throws Exception { "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); } } -} \ No newline at end of file +} From 1c9fc604c2adee0571186f064a730f07924094f5 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Sun, 23 Apr 2023 22:37:30 -0400 Subject: [PATCH 06/12] Fix EOF newline --- .../kafka/metadata/migration/KRaftMigrationDriverTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index 09e15197cbfe..d4e7db6dd919 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -535,3 +535,4 @@ public void testSkipWaitForBrokersInDualWrite() throws Exception { } } } + From a81570d3fa8afc2b8fc3219e6c7df54a082e7015 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Thu, 27 Apr 2023 14:07:47 -0400 Subject: [PATCH 07/12] Fixup checkstyle issues --- checkstyle/import-control-metadata.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml index 8ec45c5941a3..92c3225cd73e 100644 --- a/checkstyle/import-control-metadata.xml +++ b/checkstyle/import-control-metadata.xml @@ -131,6 +131,7 @@ + From db1b1b138ab3e45f6e1d53bac0c69048cf70b55a Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Wed, 26 Apr 2023 12:45:06 -0400 Subject: [PATCH 08/12] Fixing merge conflict by rebasing file. --- .../kafka/metadata/migration/KRaftMigrationDriverTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index d4e7db6dd919..22ca283027c1 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -183,7 +183,6 @@ public ZkMigrationLeadershipState writeConfigs( public ZkMigrationLeadershipState writeClientQuotas( Map clientQuotaEntity, Map quotas, - Map scram, ZkMigrationLeadershipState state ) { this.state = state; From 42ffa46b39a939a6e97b838a57284ee3e0f38733 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Wed, 26 Apr 2023 12:50:47 -0400 Subject: [PATCH 09/12] Fix noeol that caused a merge conflict --- .../kafka/metadata/migration/KRaftMigrationDriverTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index 22ca283027c1..d4e7db6dd919 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -183,6 +183,7 @@ public ZkMigrationLeadershipState writeConfigs( public ZkMigrationLeadershipState writeClientQuotas( Map clientQuotaEntity, Map quotas, + Map scram, ZkMigrationLeadershipState state ) { this.state = state; From 4663f259a1d680c4ed16debb6925b35b7d6c5de4 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Wed, 26 Apr 2023 21:33:33 -0400 Subject: [PATCH 10/12] Remove extra println --- .../apache/kafka/metadata/migration/KRaftMigrationDriver.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index a95b2c3c1c05..b73033615455 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -689,7 +689,6 @@ public void run() throws Exception { } // Updateuser scram and quota data for each user with changes in either. users.forEach(userName -> { - System.out.println("Applying changes for user : " + userName); Map userscramMap = getScramCredentialStringsForUser(userName); ClientQuotaEntity clientQuotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName)); @@ -764,9 +763,6 @@ private Map getScramCredentialStringsForUser(String userName) { userScramCredentialStrings.put(scramMechanism.mechanismName(), ScramCredentialUtils.credentialToString( scramCredentialData.toCredential(scramMechanism))); - System.out.println(" Changes are: " + - ScramCredentialUtils.credentialToString( - scramCredentialData.toCredential(scramMechanism))); } }); } From 8b5da1ceb261d9e2b0b8b8915dd9d8b033708165 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Thu, 27 Apr 2023 14:15:44 -0400 Subject: [PATCH 11/12] Fixup some PR nits --- .../kafka/metadata/migration/KRaftMigrationDriver.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index b73033615455..cdde5ab6a7ad 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -687,19 +687,19 @@ public void run() throws Exception { } }); } - // Updateuser scram and quota data for each user with changes in either. + // Update user scram and quota data for each user with changes in either. users.forEach(userName -> { - Map userscramMap = getScramCredentialStringsForUser(userName); + Map userScramMap = getScramCredentialStringsForUser(userName); ClientQuotaEntity clientQuotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName)); if (image.clientQuotas() != null) { Map quotaMap = new HashMap(); apply("Updating client quota " + clientQuotaEntity, migrationState -> - zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userscramMap, migrationState)); + zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState)); } else { Map quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap(); apply("Updating client quota " + clientQuotaEntity, migrationState -> - zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userscramMap, migrationState)); + zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState)); } }); } From cfb64063ebc972bc1ecd1e91c09c2861e9622cb4 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Thu, 27 Apr 2023 21:57:51 -0400 Subject: [PATCH 12/12] Got logic backwards --- .../apache/kafka/metadata/migration/KRaftMigrationDriver.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index cdde5ab6a7ad..5a02bbf6c0de 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -658,7 +658,6 @@ public void run() throws Exception { } if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) { - // A list of users with scram or quota changes HashSet users = new HashSet(); @@ -692,7 +691,7 @@ public void run() throws Exception { Map userScramMap = getScramCredentialStringsForUser(userName); ClientQuotaEntity clientQuotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName)); - if (image.clientQuotas() != null) { + if (image.clientQuotas() == null) { Map quotaMap = new HashMap(); apply("Updating client quota " + clientQuotaEntity, migrationState -> zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState));