Skip to content

Commit

Permalink
KAFKA-15017 Fix snapshot load in dual write mode for ClientQuotas and…
Browse files Browse the repository at this point in the history
… SCRAM (#13757)

This patch fixes the case where a ClientQuota or SCRAM credential was added in KRaft, but not written back to ZK. This missed write only occurred when handling a KRaft snapshot. If the changed quota was processed in a metadata delta (which is the typical case), it would be written to ZK.

Reviewers: David Arthur <mumrah@gmail.com>
  • Loading branch information
pprovenzano committed May 31, 2023
1 parent 9b3db6d commit 731c8c9
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@ package kafka.zk.migration

import kafka.server.{ConfigType, KafkaConfig, ZkAdminManager}
import kafka.zk.{AdminZkClient, ZkMigrationClient}
import org.apache.kafka.clients.admin.ScramMechanism
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.metadata.ClientQuotaRecord
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
import org.apache.kafka.common.metadata.ConfigRecord
import org.apache.kafka.common.metadata.UserScramCredentialRecord
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.image.{ClientQuotasDelta, ClientQuotasImage}
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.RecordTestUtils
import org.apache.kafka.metadata.migration.KRaftMigrationZkWriter
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.util.MockRandom
Expand All @@ -39,6 +45,12 @@ import scala.collection.Map
import scala.jdk.CollectionConverters._

class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
def randomBuffer(random: MockRandom, length: Int): Array[Byte] = {
val buf = new Array[Byte](length)
random.nextBytes(buf)
buf
}

@Test
def testMigrationBrokerConfigs(): Unit = {
val brokers = new java.util.ArrayList[Integer]()
Expand Down Expand Up @@ -235,12 +247,6 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
def testScram(): Unit = {
val random = new MockRandom()

def randomBuffer(random: MockRandom, length: Int): Array[Byte] = {
val buf = new Array[Byte](length)
random.nextBytes(buf)
buf
}

val scramCredential = new ScramCredential(
randomBuffer(random, 1024),
randomBuffer(random, 1024),
Expand All @@ -259,4 +265,64 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
assertEquals(1, batches.size())
assertEquals(1, batches.get(0).size)
}

@Test
def testScramAndQuotaChangesInSnapshot(): Unit = {
val random = new MockRandom()

val props = new Properties()
props.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "100000")
adminZkClient.changeConfigs(ConfigType.User, "user1", props)

// Create SCRAM records in Zookeeper.
val aliceScramCredential = new ScramCredential(
randomBuffer(random, 1024),
randomBuffer(random, 1024),
randomBuffer(random, 1024),
4096)

val alicePropsInit = new Properties()
alicePropsInit.put("SCRAM-SHA-256", ScramCredentialUtils.credentialToString(aliceScramCredential))
adminZkClient.changeConfigs(ConfigType.User, "alice", alicePropsInit)

val delta = new MetadataDelta(MetadataImage.EMPTY)

// Create a new Quota for user2
val entityData = new EntityData().setEntityType("user").setEntityName("user2")
val clientQuotaRecord = new ClientQuotaRecord()
.setEntity(List(entityData).asJava)
.setKey("request_percentage")
.setValue(58.58)
.setRemove(false)
delta.replay(clientQuotaRecord)

// Create a new SCRAM credential for george
val scramCredentialRecord = new UserScramCredentialRecord()
.setName("george")
.setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
.setSalt(randomBuffer(random, 1024))
.setStoredKey(randomBuffer(random, 1024))
.setServerKey(randomBuffer(random, 1024))
.setIterations(8192)
delta.replay(scramCredentialRecord)

// Add Quota record for user2 but not user1 to delete user1
// Add SCRAM record for george but not for alice to delete alice
val image = delta.apply(MetadataProvenance.EMPTY)

// load snapshot to Zookeeper.
val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient,
(_, operation) => { migrationState = operation.apply(migrationState) })
kraftMigrationZkWriter.handleLoadSnapshot(image)

val user1Props = zkClient.getEntityConfigs(ConfigType.User, "user1")
assertEquals(0, user1Props.size())
val user2Props = zkClient.getEntityConfigs(ConfigType.User, "user2")
assertEquals(1, user2Props.size())

val georgeProps = zkClient.getEntityConfigs(ConfigType.User, "george")
assertEquals(1, georgeProps.size())
val aliceProps = zkClient.getEntityConfigs(ConfigType.User, "alice")
assertEquals(0, aliceProps.size())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -258,6 +259,27 @@ private Map<String, String> getScramCredentialStringsForUser(ScramImage image, S
void handleClientQuotasSnapshot(ClientQuotasImage clientQuotasImage, ScramImage scramImage) {
Set<ClientQuotaEntity> changedNonUserEntities = new HashSet<>();
Set<String> changedUsers = new HashSet<>();

if (clientQuotasImage != null) {
for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : clientQuotasImage.entities().entrySet()) {
ClientQuotaEntity entity = entry.getKey();
if (entity.entries().containsKey(ClientQuotaEntity.USER) &&
!entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID)) {
// Track regular user entities separately
// There should only be 1 entry in the list of type ClientQuotaEntity.USER
changedUsers.add(entity.entries().get(ClientQuotaEntity.USER));
} else {
changedNonUserEntities.add(entity);
}
}
}
if (scramImage != null) {
for (Entry<ScramMechanism, Map<String, ScramCredentialData>> mechanismEntry : scramImage.mechanisms().entrySet()) {
for (Entry<String, ScramCredentialData> userEntry : mechanismEntry.getValue().entrySet()) {
changedUsers.add(userEntry.getKey());
}
}
}
migrationClient.configClient().iterateClientQuotas(new ConfigMigrationClient.ClientQuotaVisitor() {
@Override
public void visitClientQuota(List<ClientQuotaRecord.EntityData> entityDataList, Map<String, Double> quotas) {
Expand Down Expand Up @@ -293,13 +315,13 @@ public void visitScramCredential(String userName, ScramMechanism scramMechanism,

changedUsers.forEach(userName -> {
ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName));
Map<String, Double> quotaMap = clientQuotasImage.entities().get(entity).quotaMap();
Map<String, Double> quotaMap = clientQuotasImage.entities().
getOrDefault(entity, ClientQuotaImage.EMPTY).quotaMap();
Map<String, String> scramMap = getScramCredentialStringsForUser(scramImage, userName);
operationConsumer.accept("Update scram credentials for " + userName, migrationState ->
migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, scramMap, migrationState));
});


}

void handleConfigsDelta(ConfigurationsImage configsImage, ConfigurationsDelta configsDelta) {
Expand Down

0 comments on commit 731c8c9

Please sign in to comment.