Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14859: SCRAM ZK to KRaft migration with dual write #13628

Merged
merged 12 commits into from May 1, 2023
1 change: 1 addition & 0 deletions checkstyle/import-control-metadata.xml
Expand Up @@ -131,6 +131,7 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.common.requests" />
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/kafka/zk/ZkMigrationClient.scala
Expand Up @@ -24,6 +24,7 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, ZkAdminM
import kafka.utils.{Logging, PasswordEncoder}
import kafka.zk.TopicZNode.TopicIdReplicaAssignment
import kafka.zookeeper._
import org.apache.kafka.clients.admin.ScramMechanism
import org.apache.kafka.common.acl.AccessControlEntry
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.ControllerMovedException
Expand All @@ -32,6 +33,7 @@ import org.apache.kafka.common.metadata._
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState}
import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
Expand Down Expand Up @@ -211,6 +213,20 @@ class ZkMigrationClient(
adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) =>
val entity = new EntityData().setEntityType(entityType).setEntityName(name)
val batch = new util.ArrayList[ApiMessageAndVersion]()
ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism =>
val propertyValue = props.getProperty(mechanism.mechanismName)
if (propertyValue != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log something if we get a null value here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It just means that there are no scram records for the user.

val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue)
batch.add(new ApiMessageAndVersion(new UserScramCredentialRecord()
.setName(name)
.setMechanism(mechanism.`type`)
.setSalt(scramCredentials.salt)
.setStoredKey(scramCredentials.storedKey)
.setServerKey(scramCredentials.serverKey)
.setIterations(scramCredentials.iterations), 0.toShort))
props.remove(mechanism.mechanismName)
}
}
ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
.setEntity(List(entity).asJava)
Expand Down Expand Up @@ -470,6 +486,7 @@ class ZkMigrationClient(
override def writeClientQuotas(
entity: util.Map[String, String],
quotas: util.Map[String, java.lang.Double],
scram: util.Map[String, String],
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
val entityMap = entity.asScala
Expand All @@ -478,6 +495,7 @@ class ZkMigrationClient(
val hasIp = entityMap.contains(ClientQuotaEntity.IP)
val props = new Properties()
// We store client quota values as strings in the ZK JSON
scram.forEach { case (key, value) => props.put(key, value.toString) }
quotas.forEach { case (key, value) => props.put(key, value.toString) }
val (configType, path) = if (hasUser && !hasClient) {
(Some(ConfigType.User), Some(entityMap(ClientQuotaEntity.USER)))
Expand Down
38 changes: 38 additions & 0 deletions core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
Expand Up @@ -32,11 +32,14 @@ import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.metadata.{AccessControlEntryRecord, ConfigRecord, MetadataRecordType, ProducerIdsRecord}
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.util.MockRandom
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}

Expand Down Expand Up @@ -217,11 +220,13 @@ class ZkMigrationClientTest extends QuorumTestHarness {
migrationState: ZkMigrationLeadershipState,
entity: Map[String, String],
quotas: Map[String, java.lang.Double],
scram: Map[String, String],
zkEntityType: String,
zkEntityName: String): ZkMigrationLeadershipState = {
val nextMigrationState = migrationClient.writeClientQuotas(
entity.asJava,
quotas.asJava,
scram.asJava,
migrationState)
val newProps = ZkAdminManager.clientQuotaPropsToDoubleMap(
adminZkClient.fetchEntityConfig(zkEntityType, zkEntityName).asScala)
Expand All @@ -241,24 +246,28 @@ class ZkMigrationClientTest extends QuorumTestHarness {
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ClientQuotaEntity.USER -> "user1"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0),
Map.empty,
ConfigType.User, "user1")
assertEquals(1, migrationState.migrationZkVersion())

migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ClientQuotaEntity.USER -> "user1"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0),
Map.empty,
ConfigType.User, "user1")
assertEquals(2, migrationState.migrationZkVersion())

migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ClientQuotaEntity.USER -> "user1"),
Map.empty,
Map.empty,
ConfigType.User, "user1")
assertEquals(3, migrationState.migrationZkVersion())

migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ClientQuotaEntity.USER -> "user1"),
Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
Map.empty,
ConfigType.User, "user1")
assertEquals(4, migrationState.migrationZkVersion())
}
Expand All @@ -269,18 +278,47 @@ class ZkMigrationClientTest extends QuorumTestHarness {
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ClientQuotaEntity.USER -> "user2"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
Map.empty,
ConfigType.User, "user2")

assertEquals(1, migrationState.migrationZkVersion())

migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ClientQuotaEntity.USER -> "user2", ClientQuotaEntity.CLIENT_ID -> "clientA"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
Map.empty,
ConfigType.User, "user2/clients/clientA")

assertEquals(2, migrationState.migrationZkVersion())
}

@Test
def testScram(): Unit = {
val random = new MockRandom()
def randomBuffer(random: MockRandom, length: Int): Array[Byte] = {
val buf = new Array[Byte](length)
random.nextBytes(buf)
buf
}
val scramCredential = new ScramCredential(
randomBuffer(random, 1024),
randomBuffer(random, 1024),
randomBuffer(random, 1024),
4096)

val props = new Properties()
props.put("SCRAM-SHA-256", ScramCredentialUtils.credentialToString(scramCredential))
adminZkClient.changeConfigs(ConfigType.User, "alice", props)

val brokers = new java.util.ArrayList[Integer]()
val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()

migrationClient.readAllMetadata(batch => batches.add(batch), brokerId => brokers.add(brokerId))
assertEquals(0, brokers.size())
assertEquals(1, batches.size())
assertEquals(1, batches.get(0).size)
}

@Test
def testClaimAbsentController(): Unit = {
assertEquals(0, migrationState.migrationZkVersion())
Expand Down
Expand Up @@ -19,9 +19,11 @@
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
Expand All @@ -31,6 +33,7 @@
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.ScramCredentialData;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
Expand Down Expand Up @@ -646,19 +649,57 @@ public void run() throws Exception {
});
}

// For configs and client quotas, we need to send all of the data to the ZK client since we persist
// everything for a given entity in a single ZK node.
// For configs and client quotas, we need to send all of the data to the ZK
// client since we persist everything for a given entity in a single ZK node.
if (delta.configsDelta() != null) {
delta.configsDelta().changes().forEach((configResource, configDelta) ->
apply("Updating config resource " + configResource, migrationState ->
zkMigrationClient.writeConfigs(configResource, image.configs().configMapForResource(configResource), migrationState)));
}

if (delta.clientQuotasDelta() != null) {
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
apply("Updating client quota " + clientQuotaEntity, migrationState ->
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, migrationState));
if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) {
// A list of users with scram or quota changes
HashSet<String> users = new HashSet<String>();

// Populate list with users with scram changes
if (delta.scramDelta() != null) {
delta.scramDelta().changes().forEach((scramMechanism, changes) -> {
changes.forEach((userName, changeOpt) -> users.add(userName));
});
}

// Populate list with users with quota changes
// and apply quota changes to all non user quota changes
if (delta.clientQuotasDelta() != null) {
Map<String, String> scramMap = new HashMap<String, String>();
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {

if ((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
(!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I understand -- we exclude entities like /user/userA/client/clientX because we don't store SCRAM credentials for user+client entities? We only store SCRAM for plain user entities?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is my understanding. SCRAM records are only applied to principals and are not client specific.

String userName = clientQuotaEntity.entries().get(ClientQuotaEntity.USER);
// Add clientQuotaEntity to list to process at the end
users.add(userName);
} else {
Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
apply("Updating client quota " + clientQuotaEntity, migrationState ->
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, scramMap, migrationState));
}
});
}
// Update user scram and quota data for each user with changes in either.
users.forEach(userName -> {
Map<String, String> userScramMap = getScramCredentialStringsForUser(userName);
ClientQuotaEntity clientQuotaEntity = new
ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName));
if (image.clientQuotas() == null) {
Map<String, Double> quotaMap = new HashMap<String, Double>();
apply("Updating client quota " + clientQuotaEntity, migrationState ->
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState));
} else {
Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
apply("Updating client quota " + clientQuotaEntity, migrationState ->
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState));
}
});
}

Expand Down Expand Up @@ -712,6 +753,21 @@ public void run() throws Exception {
completionHandler.accept(null);
}

private Map<String, String> getScramCredentialStringsForUser(String userName) {
Map<String, String> userScramCredentialStrings = new HashMap<String, String>();
if (image.scram() != null) {
image.scram().mechanisms().forEach((scramMechanism, scramMechanismMap) -> {
ScramCredentialData scramCredentialData = scramMechanismMap.get(userName);
if (scramCredentialData != null) {
userScramCredentialStrings.put(scramMechanism.mechanismName(),
ScramCredentialUtils.credentialToString(
scramCredentialData.toCredential(scramMechanism)));
}
});
}
return userScramCredentialStrings;
}

private void addStandardAclToMap(Map<ResourcePattern, List<AccessControlEntry>> aclMap, StandardAcl acl) {
ResourcePattern resource = new ResourcePattern(acl.resourceType(), acl.resourceName(), acl.patternType());
aclMap.computeIfAbsent(resource, __ -> new ArrayList<>()).add(
Expand Down
Expand Up @@ -96,6 +96,7 @@ ZkMigrationLeadershipState writeConfigs(
ZkMigrationLeadershipState writeClientQuotas(
Map<String, String> clientQuotaEntity,
Map<String, Double> quotas,
Map<String, String> scram,
ZkMigrationLeadershipState state
);

Expand Down
Expand Up @@ -183,6 +183,7 @@ public ZkMigrationLeadershipState writeConfigs(
public ZkMigrationLeadershipState writeClientQuotas(
Map<String, String> clientQuotaEntity,
Map<String, Double> quotas,
Map<String, String> scram,
ZkMigrationLeadershipState state
) {
this.state = state;
Expand Down Expand Up @@ -533,4 +534,5 @@ public void testSkipWaitForBrokersInDualWrite() throws Exception {
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
}
}
}
}