Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure all SR client props are passed to FLE #2748

Merged
merged 3 commits into from Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,7 +26,6 @@
import com.google.crypto.tink.integration.awskms.AwsKmsClient;
import io.confluent.kafka.schemaregistry.encryption.tink.KmsDriver;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
Expand Down Expand Up @@ -58,7 +57,7 @@ private AWSCredentialsProvider getCredentials(Map<String, ?> configs)
} else {
return new DefaultAWSCredentialsProviderChain();
}
} catch (IOException e) {
} catch (Exception e) {
throw new GeneralSecurityException("cannot load credentials", e);
}
}
Expand Down
Expand Up @@ -23,7 +23,6 @@
import com.google.crypto.tink.integration.gcpkms.GcpKmsClient;
import io.confluent.kafka.schemaregistry.encryption.tink.KmsDriver;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
Expand Down Expand Up @@ -68,7 +67,7 @@ private GoogleCredentials getCredentials(Map<String, ?> configs)
} else {
return GoogleCredentials.getApplicationDefault();
}
} catch (IOException e) {
} catch (Exception e) {
throw new GeneralSecurityException("cannot load credentials", e);
}
}
Expand Down
Expand Up @@ -81,6 +81,11 @@ public class FieldEncryptionExecutor implements FieldRuleExecutor {
public FieldEncryptionExecutor() {
}

@Override
public boolean addOriginalConfigs() {
return true;
}

@Override
public void configure(Map<String, ?> configs) {
this.configs = configs;
Expand Down Expand Up @@ -406,7 +411,7 @@ public Object transform(RuleContext ctx, FieldContext fieldCtx, Object fieldValu
default:
throw new IllegalArgumentException("Unsupported rule mode " + ctx.ruleMode());
}
} catch (GeneralSecurityException e) {
} catch (Exception e) {
throw new RuleException(e);
}
}
Expand Down
Expand Up @@ -148,14 +148,26 @@ public Kek createKek(
String doc,
boolean shared)
throws IOException, RestClientException {
return createKek(DEFAULT_REQUEST_PROPERTIES, name, kmsType, kmsKeyId, kmsProps, doc, shared);
}

public Kek createKek(
Map<String, String> requestProperties,
String name,
String kmsType,
String kmsKeyId,
Map<String, String> kmsProps,
String doc,
boolean shared)
throws IOException, RestClientException {
CreateKekRequest request = new CreateKekRequest();
request.setName(name);
request.setKmsType(kmsType);
request.setKmsKeyId(kmsKeyId);
request.setKmsProps(kmsProps);
request.setDoc(doc);
request.setShared(shared);
Kek kek = restService.createKek(request);
Kek kek = restService.createKek(requestProperties, request);
kekCache.put(new KekId(name, false), kek);
return kek;
}
Expand All @@ -167,11 +179,21 @@ public Dek createDek(
DekFormat algorithm,
String encryptedKeyMaterial)
throws IOException, RestClientException {
return createDek(DEFAULT_REQUEST_PROPERTIES, kekName, subject, algorithm, encryptedKeyMaterial);
}

public Dek createDek(
Map<String, String> requestProperties,
String kekName,
String subject,
DekFormat algorithm,
String encryptedKeyMaterial)
throws IOException, RestClientException {
CreateDekRequest request = new CreateDekRequest();
request.setSubject(subject);
request.setAlgorithm(algorithm);
request.setEncryptedKeyMaterial(encryptedKeyMaterial);
Dek dek = restService.createDek(kekName, request);
Dek dek = restService.createDek(requestProperties, kekName, request);
dekCache.put(new DekId(kekName, subject, algorithm, false), dek);
return dek;
}
Expand All @@ -183,33 +205,63 @@ public Kek updateKek(
String doc,
Boolean shared)
throws IOException, RestClientException {
return updateKek(DEFAULT_REQUEST_PROPERTIES, name, kmsProps, doc, shared);
}

public Kek updateKek(
Map<String, String> requestProperties,
String name,
Map<String, String> kmsProps,
String doc,
Boolean shared)
throws IOException, RestClientException {
UpdateKekRequest request = new UpdateKekRequest();
request.setKmsProps(kmsProps);
request.setDoc(doc);
request.setShared(shared);
Kek kek = restService.updateKek(name, request);
Kek kek = restService.updateKek(requestProperties, name, request);
kekCache.put(new KekId(name, false), kek);
return kek;
}

@Override
public void deleteKek(String kekName, boolean permanentDelete)
throws IOException, RestClientException {
restService.deleteKek(kekName, permanentDelete);
deleteKek(DEFAULT_REQUEST_PROPERTIES, kekName, permanentDelete);
}

public void deleteKek(
Map<String, String> requestProperties, String kekName, boolean permanentDelete)
throws IOException, RestClientException {
restService.deleteKek(requestProperties, kekName, permanentDelete);
kekCache.invalidate(new KekId(kekName, permanentDelete));
}

@Override
public void deleteDek(String kekName, String subject, boolean permanentDelete)
throws IOException, RestClientException {
deleteDek(kekName, subject, null, permanentDelete);
deleteDek(DEFAULT_REQUEST_PROPERTIES, kekName, subject, permanentDelete);
}

public void deleteDek(
Map<String, String> requestProperties, String kekName,
String subject, boolean permanentDelete)
throws IOException, RestClientException {
deleteDek(requestProperties, kekName, subject, null, permanentDelete);
}

@Override
public void deleteDek(
String kekName, String subject, DekFormat algorithm, boolean permanentDelete)
throws IOException, RestClientException {
restService.deleteDek(kekName, subject, algorithm, permanentDelete);
deleteDek(DEFAULT_REQUEST_PROPERTIES, kekName, subject, algorithm, permanentDelete);
}

public void deleteDek(
Map<String, String> requestProperties, String kekName, String subject, DekFormat algorithm,
boolean permanentDelete)
throws IOException, RestClientException {
restService.deleteDek(requestProperties, kekName, subject, algorithm, permanentDelete);
dekCache.invalidate(new DekId(kekName, subject, algorithm, permanentDelete));
}

Expand Down
Expand Up @@ -23,6 +23,8 @@
import io.confluent.dekregistry.client.rest.DekRegistryRestService;
import io.confluent.dekregistry.client.rest.entities.CreateDekRequest;
import io.confluent.dekregistry.client.rest.entities.CreateKekRequest;
import io.confluent.dekregistry.client.rest.entities.Dek;
import io.confluent.dekregistry.client.rest.entities.Kek;
import io.confluent.dekregistry.storage.exceptions.DekGenerationException;
import io.confluent.dekregistry.storage.exceptions.InvalidKeyException;
import io.confluent.dekregistry.storage.utils.CompositeCacheUpdateHandler;
Expand Down Expand Up @@ -98,11 +100,11 @@ public class DekRegistry implements Closeable {
public static final String AZURE_KMS = "azure-kms";
public static final String GCP_KMS = "gcp-kms";

private static final TypeReference<KeyEncryptionKey> KEY_ENCRYPTION_KEY_TYPE =
new TypeReference<KeyEncryptionKey>() {
private static final TypeReference<Kek> KEK_TYPE =
new TypeReference<Kek>() {
};
private static final TypeReference<DataEncryptionKey> DATA_ENCRYPTION_KEY_TYPE =
new TypeReference<DataEncryptionKey>() {
private static final TypeReference<Dek> DEK_TYPE =
new TypeReference<Dek>() {
};
private static final TypeReference<Void> VOID_TYPE =
new TypeReference<Void>() {
Expand Down Expand Up @@ -360,13 +362,13 @@ public DataEncryptionKey getDek(String kekName, String subject, DekFormat algori
}
}

public KeyEncryptionKey createKekOrForward(CreateKekRequest request,
public Kek createKekOrForward(CreateKekRequest request,
Map<String, String> headerProperties) throws SchemaRegistryException {
String tenant = schemaRegistry.tenant();
lock(tenant, headerProperties);
try {
if (isLeader(headerProperties)) {
return createKek(request);
return createKek(request).toKekEntity();
} else {
// forward registering request to the leader
if (schemaRegistry.leaderIdentity() != null) {
Expand All @@ -380,7 +382,7 @@ public KeyEncryptionKey createKekOrForward(CreateKekRequest request,
}
}

private KeyEncryptionKey forwardCreateKekRequestToLeader(CreateKekRequest request,
private Kek forwardCreateKekRequestToLeader(CreateKekRequest request,
Map<String, String> headerProperties)
throws SchemaRegistryRequestForwardingException {
RestService leaderRestService = schemaRegistry.leaderRestService();
Expand All @@ -392,7 +394,7 @@ private KeyEncryptionKey forwardCreateKekRequestToLeader(CreateKekRequest reques
log.debug(String.format("Forwarding create key request to %s", baseUrl));
try {
return leaderRestService.httpRequest(
path, "POST", toJson(request), headerProperties, KEY_ENCRYPTION_KEY_TYPE);
path, "POST", toJson(request), headerProperties, KEK_TYPE);
} catch (IOException e) {
throw new SchemaRegistryRequestForwardingException(
String.format("Unexpected error while forwarding the create key request to %s",
Expand Down Expand Up @@ -441,13 +443,13 @@ private String normalizeKmsType(String kmsType) {
}
}

public DataEncryptionKey createDekOrForward(String kekName, CreateDekRequest request,
public Dek createDekOrForward(String kekName, CreateDekRequest request,
Map<String, String> headerProperties) throws SchemaRegistryException {
String tenant = schemaRegistry.tenant();
lock(tenant, headerProperties);
try {
if (isLeader(headerProperties)) {
return createDek(kekName, request);
return createDek(kekName, request).toDekEntity();
} else {
// forward registering request to the leader
if (schemaRegistry.leaderIdentity() != null) {
Expand All @@ -461,7 +463,7 @@ public DataEncryptionKey createDekOrForward(String kekName, CreateDekRequest req
}
}

private DataEncryptionKey forwardCreateDekRequestToLeader(String kekName,
private Dek forwardCreateDekRequestToLeader(String kekName,
CreateDekRequest request, Map<String, String> headerProperties)
throws SchemaRegistryRequestForwardingException {
RestService leaderRestService = schemaRegistry.leaderRestService();
Expand All @@ -473,7 +475,7 @@ private DataEncryptionKey forwardCreateDekRequestToLeader(String kekName,
log.debug(String.format("Forwarding create key request to %s", baseUrl));
try {
return leaderRestService.httpRequest(
path, "POST", toJson(request), headerProperties, DATA_ENCRYPTION_KEY_TYPE);
path, "POST", toJson(request), headerProperties, DEK_TYPE);
} catch (IOException e) {
throw new SchemaRegistryRequestForwardingException(
String.format("Unexpected error while forwarding the create key request to %s",
Expand Down Expand Up @@ -559,13 +561,14 @@ protected DataEncryptionKey generateRawDek(KeyEncryptionKey kek, DataEncryptionK
}
}

public KeyEncryptionKey putKekOrForward(String name, UpdateKekRequest request,
public Kek putKekOrForward(String name, UpdateKekRequest request,
Map<String, String> headerProperties) throws SchemaRegistryException {
String tenant = schemaRegistry.tenant();
lock(tenant, headerProperties);
try {
if (isLeader(headerProperties)) {
return putKek(name, request);
KeyEncryptionKey kek = putKek(name, request);
return kek != null ? kek.toKekEntity() : null;
} else {
// forward registering request to the leader
if (schemaRegistry.leaderIdentity() != null) {
Expand All @@ -579,7 +582,7 @@ public KeyEncryptionKey putKekOrForward(String name, UpdateKekRequest request,
}
}

private KeyEncryptionKey forwardPutKekRequestToLeader(String name,
private Kek forwardPutKekRequestToLeader(String name,
UpdateKekRequest request, Map<String, String> headerProperties)
throws SchemaRegistryRequestForwardingException {
RestService leaderRestService = schemaRegistry.leaderRestService();
Expand All @@ -591,7 +594,7 @@ private KeyEncryptionKey forwardPutKekRequestToLeader(String name,
log.debug(String.format("Forwarding put key request to %s", baseUrl));
try {
return leaderRestService.httpRequest(
path, "PUT", toJson(request), headerProperties, KEY_ENCRYPTION_KEY_TYPE);
path, "PUT", toJson(request), headerProperties, KEK_TYPE);
} catch (IOException e) {
throw new SchemaRegistryRequestForwardingException(
String.format("Unexpected error while forwarding the put key request to %s",
Expand Down
Expand Up @@ -203,8 +203,7 @@ public void createKek(
headers, getSchemaRegistry().config().whitelistHeaders());

try {
KeyEncryptionKey key = dekRegistry.createKekOrForward(request, headerProperties);
Kek kek = key.toKekEntity();
Kek kek = dekRegistry.createKekOrForward(request, headerProperties);
asyncResponse.resume(kek);
} catch (AlreadyExistsException e) {
throw DekRegistryErrors.alreadyExistsException(e.getMessage());
Expand Down Expand Up @@ -245,8 +244,7 @@ public void createDek(
headers, getSchemaRegistry().config().whitelistHeaders());

try {
DataEncryptionKey key = dekRegistry.createDekOrForward(kekName, request, headerProperties);
Dek dek = key.toDekEntity();
Dek dek = dekRegistry.createDekOrForward(kekName, request, headerProperties);
asyncResponse.resume(dek);
} catch (AlreadyExistsException e) {
throw DekRegistryErrors.alreadyExistsException(e.getMessage());
Expand Down Expand Up @@ -286,11 +284,10 @@ public void putKek(
headers, getSchemaRegistry().config().whitelistHeaders());

try {
KeyEncryptionKey key = dekRegistry.putKekOrForward(name, request, headerProperties);
if (key == null) {
Kek kek = dekRegistry.putKekOrForward(name, request, headerProperties);
if (kek == null) {
throw DekRegistryErrors.keyNotFoundException(name);
}
Kek kek = key.toKekEntity();
asyncResponse.resume(kek);
} catch (SchemaRegistryException e) {
throw Errors.schemaRegistryException("Error while creating key", e);
Expand Down