-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
DGS-9450 Add command line tool to register/auto-rotate DEKs
- Loading branch information
Showing
6 changed files
with
186 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
#!/bin/bash | ||
# | ||
# Copyright 2023 Confluent Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
base_dir=$(dirname $0)/.. | ||
|
||
# Production jars | ||
export CLASSPATH=$CLASSPATH:$base_dir/share/java/kafka-serde-tools/* | ||
|
||
# Development jars. `mvn package` should collect all the required dependency jars here | ||
for dir in $base_dir/package-kafka-serde-tools/target/kafka-serde-tools-package-*-development; do | ||
export CLASSPATH=$CLASSPATH:$dir/share/java/kafka-serde-tools/* | ||
done | ||
|
||
exec $(dirname $0)/schema-registry-run-class io.confluent.kafka.schemaregistry.encryption.tools.RegisterDeks "$@" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
140 changes: 140 additions & 0 deletions
140
...yption/src/main/java/io/confluent/kafka/schemaregistry/encryption/tools/RegisterDeks.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
/* | ||
* Copyright 2023 Confluent Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.confluent.kafka.schemaregistry.encryption.tools; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import io.confluent.kafka.schemaregistry.ParsedSchema; | ||
import io.confluent.kafka.schemaregistry.SchemaProvider; | ||
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; | ||
import io.confluent.kafka.schemaregistry.client.SchemaMetadata; | ||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; | ||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; | ||
import io.confluent.kafka.schemaregistry.client.rest.entities.Rule; | ||
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode; | ||
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; | ||
import io.confluent.kafka.schemaregistry.encryption.FieldEncryptionExecutor; | ||
import io.confluent.kafka.schemaregistry.encryption.FieldEncryptionExecutor.FieldEncryptionExecutorTransform; | ||
import io.confluent.kafka.schemaregistry.rules.RuleContext; | ||
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.Callable; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import picocli.CommandLine; | ||
import picocli.CommandLine.Command; | ||
import picocli.CommandLine.Option; | ||
import picocli.CommandLine.Parameters; | ||
|
||
@Command(name = "register-deks", mixinStandardHelpOptions = true, | ||
description = "Register and/or auto-rotate DEKs according to a specified data contract.", | ||
sortOptions = false, sortSynopsis = false) | ||
public class RegisterDeks implements Callable<Integer> { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(RegisterDeks.class); | ||
|
||
@Parameters(index = "0") | ||
private String baseUrl; | ||
@Parameters(index = "1") | ||
private String subject; | ||
@Parameters(index = "2", arity = "0..1", defaultValue = "-1") | ||
private int version; | ||
@Option(names = {"-p", "--property"}) | ||
private Map<String, String> configs; | ||
|
||
public RegisterDeks() { | ||
} | ||
|
||
@Override | ||
public Integer call() throws Exception { | ||
Map<String, String> configs = this.configs != null | ||
? new HashMap<>(this.configs) | ||
: new HashMap<>(); | ||
configs.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, baseUrl); | ||
|
||
try (SchemaRegistryClient client = SchemaRegistryClientFactory.newClient( | ||
Collections.singletonList(baseUrl), | ||
1000, | ||
ImmutableList.of(new AvroSchemaProvider()), | ||
configs, | ||
Collections.emptyMap() | ||
)) { | ||
SchemaMetadata schemaMetadata = client.getSchemaMetadata(subject, version); | ||
Optional<ParsedSchema> schema = parseSchema(schemaMetadata); | ||
if (!schema.isPresent()) { | ||
LOG.error("No schema found"); | ||
return 1; | ||
} | ||
ParsedSchema parsedSchema = schema.get(); | ||
if (parsedSchema.ruleSet() == null || parsedSchema.ruleSet().getDomainRules() == null) { | ||
LOG.info("No rules found"); | ||
return 0; | ||
} | ||
try (FieldEncryptionExecutor executor = new FieldEncryptionExecutor()) { | ||
executor.configure(configs); | ||
List<Rule> rules = parsedSchema.ruleSet().getDomainRules(); | ||
for (int i = 0; i < rules.size(); i++) { | ||
Rule rule = rules.get(i); | ||
if (rule.isDisabled() || !FieldEncryptionExecutor.TYPE.equals(rule.getType())) { | ||
continue; | ||
} | ||
RuleContext ctx = new RuleContext(Collections.emptyMap(), null, parsedSchema, | ||
subject, null, null, null, null, false, RuleMode.WRITE, rule, i, rules); | ||
FieldEncryptionExecutorTransform transform = executor.newTransform(ctx); | ||
transform.getOrCreateDek(ctx, transform.isDekRotated() ? -1 : null); | ||
} | ||
} | ||
return 0; | ||
} | ||
} | ||
|
||
private Optional<ParsedSchema> parseSchema(SchemaMetadata schemaMetadata) throws Exception { | ||
SchemaProvider provider; | ||
switch (schemaMetadata.getSchemaType()) { | ||
case "AVRO": | ||
provider = new AvroSchemaProvider(); | ||
break; | ||
case "JSON": | ||
provider = (SchemaProvider) | ||
Class.forName("io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider") | ||
.getDeclaredConstructor() | ||
.newInstance(); | ||
break; | ||
case "PROTOBUF": | ||
provider = (SchemaProvider) | ||
Class.forName("io.confluent.kafka.schemaregistry.json.JsonSchemaProvider") | ||
.getDeclaredConstructor() | ||
.newInstance(); | ||
break; | ||
default: | ||
throw new IllegalArgumentException("Unsupported schema type " | ||
+ schemaMetadata.getSchemaType()); | ||
} | ||
return provider.parseSchema(new Schema(null, schemaMetadata), false, false); | ||
} | ||
|
||
public static void main(String[] args) { | ||
CommandLine commandLine = new CommandLine(new RegisterDeks()); | ||
commandLine.setUsageHelpLongOptionsMaxWidth(30); | ||
int exitCode = commandLine.execute(args); | ||
System.exit(exitCode); | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters