Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Dec 14, 2023
1 parent d11ef76 commit d6d92d6
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 9 deletions.
Expand Up @@ -16,6 +16,8 @@

package io.confluent.kafka.schemaregistry.encryption.tools;

import static io.confluent.kafka.schemaregistry.encryption.FieldEncryptionExecutor.CLOCK;

import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.SchemaProvider;
Expand All @@ -32,6 +34,7 @@
import io.confluent.kafka.schemaregistry.rules.RuleException;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.security.GeneralSecurityException;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -67,15 +70,28 @@ public class RegisterDeks implements Callable<Integer> {
description = "Set configuration property.", paramLabel = "<prop=val>")
private Map<String, String> configs;

private Clock clock;

public RegisterDeks() {
}

public Clock getClock() {
return clock;
}

public void setClock(Clock clock) {
this.clock = clock;
}

@Override
public Integer call() throws Exception {
Map<String, String> configs = this.configs != null
Map<String, Object> configs = this.configs != null
? new HashMap<>(this.configs)
: new HashMap<>();
configs.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, baseUrl);
if (clock != null) {
configs.put(CLOCK, clock);
}

try (SchemaRegistryClient client = SchemaRegistryClientFactory.newClient(
Collections.singletonList(baseUrl),
Expand All @@ -84,7 +100,9 @@ public Integer call() throws Exception {
configs,
Collections.emptyMap()
)) {
SchemaMetadata schemaMetadata = client.getSchemaMetadata(subject, version);
SchemaMetadata schemaMetadata = version >= 0
? client.getSchemaMetadata(subject, version)
: client.getLatestSchemaMetadata(subject);
Optional<ParsedSchema> schema = parseSchema(schemaMetadata);
if (!schema.isPresent()) {
LOG.error("No schema found");
Expand All @@ -107,10 +125,10 @@ public Integer call() throws Exception {
}
}

private void processRule(Map<String, String> configs, ParsedSchema parsedSchema, List<Rule> rules,
private void processRule(Map<String, Object> configs, ParsedSchema parsedSchema, List<Rule> rules,
int i, Rule rule) throws RuleException, GeneralSecurityException {
try (FieldEncryptionExecutor executor = new FieldEncryptionExecutor()) {
Map<String, String> ruleConfigs = configsWithoutPrefix(rule, configs);
Map<String, Object> ruleConfigs = configsWithoutPrefix(rule, configs);
executor.configure(ruleConfigs);
RuleContext ctx = new RuleContext(configs, null, parsedSchema,
subject, null, null, null, null, false, RuleMode.WRITE, rule, i, rules);
Expand Down Expand Up @@ -144,17 +162,17 @@ private Optional<ParsedSchema> parseSchema(SchemaMetadata schemaMetadata) throws
return provider.parseSchema(new Schema(null, schemaMetadata), false, false);
}

private Map<String, String> configsWithoutPrefix(Rule rule, Map<String, String> configs) {
Map<String, String> ruleConfigs = new HashMap<>(configs);
for (Map.Entry<String, String> entry: configs.entrySet()) {
private Map<String, Object> configsWithoutPrefix(Rule rule, Map<String, Object> configs) {
Map<String, Object> ruleConfigs = new HashMap<>(configs);
for (Map.Entry<String, Object> entry: configs.entrySet()) {
String name = entry.getKey();
if (name.startsWith(DEFAULT_RULE_PARAM_PREFIX)) {
ruleConfigs.put(name.substring(DEFAULT_RULE_PARAM_PREFIX.length()), entry.getValue());
}
}
// Specific params override default params
String prefix = "rule.executors." + rule.getName() + ".param.";
for (Map.Entry<String, String> entry: configs.entrySet()) {
for (Map.Entry<String, Object> entry: configs.entrySet()) {
String name = entry.getKey();
if (name.startsWith(prefix)) {
ruleConfigs.put(name.substring(prefix.length()), entry.getValue());
Expand Down
Expand Up @@ -74,7 +74,6 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down
@@ -0,0 +1,167 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.kafka.schemaregistry.encryption.tools;

import static org.junit.Assert.assertEquals;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedSet;
import io.confluent.dekregistry.client.DekRegistryClient;
import io.confluent.dekregistry.client.DekRegistryClientFactory;
import io.confluent.dekregistry.client.rest.entities.Dek;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory;
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.Rule;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.encryption.FieldEncryptionExecutor;
import io.confluent.kafka.schemaregistry.encryption.FieldEncryptionProperties;
import io.confluent.kafka.schemaregistry.encryption.local.LocalFieldEncryptionProperties;
import io.confluent.kafka.schemaregistry.testutil.FakeClock;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.junit.Test;
import picocli.CommandLine;

public class RegisterDeksTest {

private final FieldEncryptionProperties fieldEncryptionProps;
private final SchemaRegistryClient schemaRegistry;
private final DekRegistryClient dekRegistry;
private final String topic;
private final FakeClock fakeClock = new FakeClock();

public RegisterDeksTest() throws Exception {
topic = "test";
fieldEncryptionProps = new LocalFieldEncryptionProperties(ImmutableList.of("rule1"));
schemaRegistry = SchemaRegistryClientFactory.newClient(Collections.singletonList(
"mock://"),
1000,
ImmutableList.of(new AvroSchemaProvider()),
null,
null
);
dekRegistry = DekRegistryClientFactory.newClient(Collections.singletonList(
"mock://"),
1000,
100000,
Collections.emptyMap(),
null
);
}

private Schema createUserSchema() {
String userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", "
+ "\"name\": \"User\","
+ "\"fields\": ["
+ "{\"name\": \"name\", \"type\": [\"null\", \"string\"], \"confluent:tags\": [\"PII\", \"PII3\"]},"
+ "{\"name\": \"name2\", \"type\": [\"null\", \"string\"], \"confluent:tags\": [\"PII2\"]},"
+ "{\"name\": \"age\", \"type\": [\"null\", \"int\"]}"
+ "]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
return schema;
}

@Test
public void testRegisterDek() throws Exception {
String subject = topic + "-value";
String kekName = "kek1";
AvroSchema avroSchema = new AvroSchema(createUserSchema());
Rule rule = new Rule("rule1", null, null, null,
FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"), null, null, null, null, false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule));
Metadata metadata = getMetadata(kekName);
avroSchema = avroSchema.copy(metadata, ruleSet);
schemaRegistry.register(subject, avroSchema);

RegisterDeks app = new RegisterDeks();
CommandLine cmd = new CommandLine(app);

int exitCode = cmd.execute(
"mock://", subject, "--property", "rule.executors._default_.param.secret=mysecret");
assertEquals(0, exitCode);

Dek dek = dekRegistry.getDekVersion(kekName, subject, -1, null, false);
assertEquals(kekName, dek.getKekName());
}

@Test
public void testRotateDek() throws Exception {
String subject = topic + "rotate-value";
String kekName = "kek2";
AvroSchema avroSchema = new AvroSchema(createUserSchema());
Rule rule = new Rule("rule1", null, null, null,
FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"),
ImmutableMap.of("encrypt.dek.expiry.days", "1"),
null, null, null, false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule));
Metadata metadata = getMetadata(kekName);
avroSchema = avroSchema.copy(metadata, ruleSet);
schemaRegistry.register(subject, avroSchema);

RegisterDeks app = new RegisterDeks();
app.setClock(fakeClock);
CommandLine cmd = new CommandLine(app);

int exitCode = cmd.execute(
"mock://", subject, "--property", "rule.executors._default_.param.secret=mysecret");
assertEquals(0, exitCode);

Dek dek = dekRegistry.getDekVersion(kekName, subject, -1, null, false);
assertEquals(kekName, dek.getKekName());
assertEquals(1, dek.getVersion());

exitCode = cmd.execute(
"mock://", subject, "--property", "rule.executors._default_.param.secret=mysecret");
assertEquals(0, exitCode);

dek = dekRegistry.getDekVersion(kekName, subject, -1, null, false);
assertEquals(kekName, dek.getKekName());
assertEquals(1, dek.getVersion());

// Advance 2 days
fakeClock.advance(2, ChronoUnit.DAYS);

exitCode = cmd.execute(
"mock://", subject, "--property", "rule.executors._default_.param.secret=mysecret");
assertEquals(0, exitCode);

dek = dekRegistry.getDekVersion(kekName, subject, -1, null, false);
assertEquals(kekName, dek.getKekName());
assertEquals(2, dek.getVersion());
}

protected Metadata getMetadata(String kekName) {
Map<String, String> properties = new HashMap<>();
properties.put(FieldEncryptionExecutor.ENCRYPT_KEK_NAME, kekName);
properties.put(FieldEncryptionExecutor.ENCRYPT_KMS_TYPE, fieldEncryptionProps.getKmsType());
properties.put(FieldEncryptionExecutor.ENCRYPT_KMS_KEY_ID, fieldEncryptionProps.getKmsKeyId());
return getMetadata(properties);
}

protected Metadata getMetadata(Map<String, String> properties) {
return new Metadata(Collections.emptyMap(), properties, Collections.emptySet());
}
}

0 comments on commit d6d92d6

Please sign in to comment.