From 68116e594cea5f81f5366484dd8c120cd9cc5c30 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 6 Oct 2023 16:39:18 -0700 Subject: [PATCH 1/5] DGS-8708 Add CSFLE config to preserve source message --- .../encryption/FieldEncryptionExecutor.java | 30 +++++ .../FieldEncryptionExecutorTest.java | 103 +++++++++++++++++- .../kafka/schemaregistry/ParsedSchema.java | 4 + .../kafka/schemaregistry/avro/AvroSchema.java | 6 + .../kafka/schemaregistry/rules/DlqAction.java | 5 +- .../rules/FieldRuleExecutor.java | 6 + .../kafka/schemaregistry/json/JsonSchema.java | 11 +- .../protobuf/ProtobufSchema.java | 6 + 8 files changed, 165 insertions(+), 6 deletions(-) diff --git a/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java b/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java index 1d5a06ab4e8..a130da543a6 100644 --- a/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java +++ b/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java @@ -66,6 +66,7 @@ public class FieldEncryptionExecutor implements FieldRuleExecutor { public static final String ENCRYPT_KMS_KEY_ID = "encrypt.kms.key.id"; public static final String ENCRYPT_KMS_TYPE = "encrypt.kms.type"; public static final String ENCRYPT_DEK_ALGORITHM = "encrypt.dek.algorithm"; + public static final String ENCRYPT_PRESERVE_SOURCE = "encrypt.preserve.source"; public static final String KMS_TYPE_SUFFIX = "://"; public static final byte[] EMPTY_AAD = new byte[0]; @@ -74,6 +75,7 @@ public class FieldEncryptionExecutor implements FieldRuleExecutor { private Map cryptors; private Map configs; + private Boolean preserveSource; private int cacheExpirySecs = -1; private int cacheSize = 10000; private DekRegistryClient client; @@ -81,6 +83,10 @@ public class FieldEncryptionExecutor implements FieldRuleExecutor { public FieldEncryptionExecutor() { } + public boolean isPreserveSource() { + return Boolean.TRUE.equals(preserveSource); + } + @Override public boolean addOriginalConfigs() { return true; @@ -89,6 +95,10 @@ public boolean addOriginalConfigs() { @Override public void configure(Map configs) { this.configs = configs; + Object preserveSourceConfig = configs.get(ENCRYPT_PRESERVE_SOURCE); + if (preserveSourceConfig != null) { + this.preserveSource = Boolean.parseBoolean(preserveSourceConfig.toString()); + } Object cacheExpirySecsConfig = configs.get(CACHE_EXPIRY_SECS); if (cacheExpirySecsConfig != null) { try { @@ -127,6 +137,20 @@ public FieldTransform newTransform(RuleContext ctx) throws RuleException { return transform; } + @Override + public Object preTransformMessage(RuleContext ctx, FieldTransform transform, Object message) + throws RuleException { + if (isPreserveSource()) { + try { + // We use the target schema + message = ctx.target().copyMessage(message); + } catch (IOException e) { + throw new RuleException("Could copy source message", e); + } + } + return message; + } + private Cryptor getCryptor(RuleContext ctx) { String algorithm = ctx.getParameter(ENCRYPT_DEK_ALGORITHM); DekFormat dekFormat = algorithm != null @@ -201,6 +225,12 @@ public void init(RuleContext ctx) throws RuleException { cryptor = getCryptor(ctx); kekName = getKekName(ctx); kek = getKek(ctx, kekName); + if (FieldEncryptionExecutor.this.preserveSource == null) { + String preserveValueConfig = ctx.getParameter(ENCRYPT_PRESERVE_SOURCE); + if (preserveValueConfig != null) { + FieldEncryptionExecutor.this.preserveSource = Boolean.parseBoolean(preserveValueConfig); + } + } } protected String getKekName(RuleContext ctx) throws RuleException { diff --git a/client-encryption/src/test/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutorTest.java b/client-encryption/src/test/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutorTest.java index dfd79cc3c98..17f5b75ad75 100644 --- a/client-encryption/src/test/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutorTest.java +++ b/client-encryption/src/test/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutorTest.java @@ -265,7 +265,7 @@ private Schema createUserSchema() { return schema; } - private IndexedRecord createUserRecord() { + private GenericRecord createUserRecord() { Schema schema = createUserSchema(); GenericRecord avroRecord = new GenericData.Record(schema); avroRecord.put("name", "testUser"); @@ -358,6 +358,31 @@ public void testKafkaAvroSerializer() throws Exception { assertEquals("testUser", record.get("name")); } + @Test + public void testKafkaAvroSerializerPreserveSource() throws Exception { + GenericRecord avroRecord = createUserRecord(); + AvroSchema avroSchema = new AvroSchema(createUserSchema()); + Rule rule = new Rule("rule1", null, null, null, + FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"), + ImmutableMap.of("encrypt.preserve.source", "true"), null, null, null, false); + RuleSet ruleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule)); + Metadata metadata = getMetadata("kek1"); + avroSchema = avroSchema.copy(metadata, ruleSet); + schemaRegistry.register(topic + "-value", avroSchema); + + int expectedEncryptions = 1; + RecordHeaders headers = new RecordHeaders(); + Cryptor cryptor = addSpyToCryptor(avroSerializer); + byte[] bytes = avroSerializer.serialize(topic, headers, avroRecord); + verify(cryptor, times(expectedEncryptions)).encrypt(any(), any(), any()); + cryptor = addSpyToCryptor(avroDeserializer); + GenericRecord record = (GenericRecord) avroDeserializer.deserialize(topic, headers, bytes); + verify(cryptor, times(expectedEncryptions)).decrypt(any(), any(), any()); + assertEquals("testUser", record.get("name")); + // Old value is preserved + assertEquals("testUser", avroRecord.get("name")); + } + @Test public void testKafkaAvroSerializer2() throws Exception { IndexedRecord avroRecord = createUserRecord(); @@ -455,6 +480,40 @@ public void testKafkaAvroSerializerReflection() throws Exception { assertEquals("678", ((OldWidget)obj).getPiiMap().get("key2").getPii()); } + @Test + public void testKafkaAvroSerializerReflectionPreserveSource() throws Exception { + OldWidget widget = new OldWidget("alice"); + widget.setSsn(ImmutableList.of("123", "456")); + widget.setPiiArray(ImmutableList.of(new OldPii("789"), new OldPii("012"))); + widget.setPiiMap(ImmutableMap.of("key1", new OldPii("345"), "key2", new OldPii("678"))); + Schema schema = createWidgetSchema(); + AvroSchema avroSchema = new AvroSchema(schema); + Rule rule = new Rule("rule1", null, null, null, + FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"), + ImmutableMap.of("encrypt.preserve.source", "true"), null, null, null, false); + RuleSet ruleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule)); + Metadata metadata = getMetadata("kek1"); + avroSchema = avroSchema.copy(metadata, ruleSet); + schemaRegistry.register(topic + "-value", avroSchema); + + int expectedEncryptions = 7; + RecordHeaders headers = new RecordHeaders(); + Cryptor cryptor = addSpyToCryptor(reflectionAvroSerializer); + byte[] bytes = reflectionAvroSerializer.serialize(topic, headers, widget); + verify(cryptor, times(expectedEncryptions)).encrypt(any(), any(), any()); + cryptor = addSpyToCryptor(reflectionAvroDeserializer); + Object obj = reflectionAvroDeserializer.deserialize(topic, headers, bytes); + verify(cryptor, times(expectedEncryptions)).decrypt(any(), any(), any()); + + assertTrue( + "Returned object should be a Widget", + OldWidget.class.isInstance(obj) + ); + assertEquals("alice", ((OldWidget)obj).getName()); + // Old value is preserved + assertEquals("alice", widget.getName()); + } + @Test public void testKafkaAvroSerializerMultipleRules() throws Exception { IndexedRecord avroRecord = createUserRecord(); @@ -809,6 +868,48 @@ public void testKafkaJsonSchemaSerializer() throws Exception { ); } + @Test + public void testKafkaJsonSchemaSerializerPreserveSource() throws Exception { + OldWidget widget = new OldWidget("alice"); + widget.setSize(123); + widget.setSsn(ImmutableList.of("123", "456")); + widget.setPiiArray(ImmutableList.of(new OldPii("789"), new OldPii("012"))); + String schemaStr = "{\"$schema\":\"http://json-schema.org/draft-07/schema#\",\"title\":\"Old Widget\",\"type\":\"object\",\"additionalProperties\":false,\"properties\":{\n" + + "\"name\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"string\"}]," + + "\"confluent:tags\": [ \"PII\" ]}," + + "\"ssn\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"array\",\"items\":{\"type\":\"string\"}}]," + + "\"confluent:tags\": [ \"PII\" ]}," + + "\"piiArray\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"array\",\"items\":{\"$ref\":\"#/definitions/OldPii\"}}]}," + + "\"piiMap\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"object\",\"additionalProperties\":{\"$ref\":\"#/definitions/OldPii\"}}]}," + + "\"size\":{\"type\":\"integer\"}," + + "\"version\":{\"type\":\"integer\"}}," + + "\"required\":[\"size\",\"version\"]," + + "\"definitions\":{\"OldPii\":{\"type\":\"object\",\"additionalProperties\":false,\"properties\":{" + + "\"pii\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"string\"}]," + + "\"confluent:tags\": [ \"PII\" ]}}}}}"; + JsonSchema jsonSchema = new JsonSchema(schemaStr); + Rule rule = new Rule("rule1", null, null, null, + FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"), + ImmutableMap.of("encrypt.preserve.source", "true"), null, null, null, false); + RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule)); + Metadata metadata = getMetadata("kek1"); + jsonSchema = jsonSchema.copy(metadata, ruleSet); + schemaRegistry.register(topic + "-value", jsonSchema); + + int expectedEncryptions = 5; + RecordHeaders headers = new RecordHeaders(); + Cryptor cryptor = addSpyToCryptor(jsonSchemaSerializer); + byte[] bytes = jsonSchemaSerializer.serialize(topic, headers, widget); + verify(cryptor, times(expectedEncryptions)).encrypt(any(), any(), any()); + cryptor = addSpyToCryptor(jsonSchemaDeserializer); + Object obj = jsonSchemaDeserializer.deserialize(topic, headers, bytes); + verify(cryptor, times(expectedEncryptions)).decrypt(any(), any(), any()); + + assertEquals("alice", ((JsonNode)obj).get("name").textValue()); + // Old value is preserved + assertEquals("alice", widget.getName()); + } + @Test public void testKafkaJsonSchemaSerializerAnnotated() throws Exception { AnnotatedOldWidget widget = new AnnotatedOldWidget("alice"); diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/ParsedSchema.java b/client/src/main/java/io/confluent/kafka/schemaregistry/ParsedSchema.java index 96d5eab8c9b..0464734f720 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/ParsedSchema.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/ParsedSchema.java @@ -235,6 +235,10 @@ default JsonNode toJson(Object object) throws IOException { throw new UnsupportedOperationException(); } + default Object copyMessage(Object message) throws IOException { + throw new UnsupportedOperationException(); + } + default Object transformMessage(RuleContext ctx, FieldTransform transform, Object message) throws RuleException { throw new UnsupportedOperationException(); diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java b/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java index 8f71f326ed0..aa7c82c13cc 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java @@ -509,6 +509,12 @@ public JsonNode toJson(Object message) throws IOException { return JacksonMapper.INSTANCE.readTree(AvroSchemaUtils.toJson(message)); } + @Override + public Object copyMessage(Object message) throws IOException { + GenericData data = getData(message); + return data.deepCopy(rawSchema(), message); + } + @Override public Object transformMessage(RuleContext ctx, FieldTransform transform, Object message) throws RuleException { diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/rules/DlqAction.java b/client/src/main/java/io/confluent/kafka/schemaregistry/rules/DlqAction.java index d2b34c59448..804d109cc07 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/rules/DlqAction.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/rules/DlqAction.java @@ -45,6 +45,8 @@ public class DlqAction implements RuleAction { public static final String TYPE = "DLQ"; public static final String DLQ_TOPIC = "dlq.topic"; + public static final String DLQ_AUTO_FLUSH = "dlq.auto.flush"; + public static final String PRODUCER = "producer"; // for testing public static final String HEADER_PREFIX = "__rule."; public static final String RULE_NAME = HEADER_PREFIX + "name"; @@ -53,9 +55,6 @@ public class DlqAction implements RuleAction { public static final String RULE_TOPIC = HEADER_PREFIX + "topic"; public static final String RULE_EXCEPTION = HEADER_PREFIX + "exception"; - public static final String DLQ_AUTO_FLUSH = "dlq.auto.flush"; - public static final String PRODUCER = "producer"; // for testing - private static final LongSerializer LONG_SERIALIZER = new LongSerializer(); private static final IntegerSerializer INT_SERIALIZER = new IntegerSerializer(); private static final ShortSerializer SHORT_SERIALIZER = new ShortSerializer(); diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/rules/FieldRuleExecutor.java b/client/src/main/java/io/confluent/kafka/schemaregistry/rules/FieldRuleExecutor.java index 34c246ff733..7a52fe240ff 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/rules/FieldRuleExecutor.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/rules/FieldRuleExecutor.java @@ -63,6 +63,7 @@ default Object transform(RuleContext ctx, Object message) throws RuleException { try (FieldTransform transform = newTransform(ctx)) { if (transform != null) { + message = preTransformMessage(ctx, transform, message); return ctx.target().transformMessage(ctx, transform, message); } else { return message; @@ -70,6 +71,11 @@ default Object transform(RuleContext ctx, Object message) throws RuleException { } } + default Object preTransformMessage(RuleContext ctx, FieldTransform transform, Object message) + throws RuleException { + return message; + } + static boolean areTransformsWithSameTags(Rule rule1, Rule rule2) { return rule1.getTags().size() > 0 && rule1.getKind() == RuleKind.TRANSFORM diff --git a/json-schema-provider/src/main/java/io/confluent/kafka/schemaregistry/json/JsonSchema.java b/json-schema-provider/src/main/java/io/confluent/kafka/schemaregistry/json/JsonSchema.java index cddcd6fa6b7..8345a26e220 100644 --- a/json-schema-provider/src/main/java/io/confluent/kafka/schemaregistry/json/JsonSchema.java +++ b/json-schema-provider/src/main/java/io/confluent/kafka/schemaregistry/json/JsonSchema.java @@ -45,7 +45,6 @@ import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata; import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet; import io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap; -import io.confluent.kafka.schemaregistry.utils.JacksonMapper; import java.math.BigDecimal; import java.math.BigInteger; import java.util.HashMap; @@ -511,7 +510,15 @@ public JsonNode toJson(Object message) throws IOException { if (message instanceof JsonNode) { return (JsonNode) message; } - return JacksonMapper.INSTANCE.readTree(JsonSchemaUtils.toJson(message)); + return objectMapper.readTree(JsonSchemaUtils.toJson(message)); + } + + @Override + public Object copyMessage(Object message) throws IOException { + if (message instanceof JsonNode) { + return ((JsonNode) message).deepCopy(); + } + return toJson(message); } @Override diff --git a/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchema.java b/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchema.java index c0412fed686..97c53f1222e 100644 --- a/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchema.java +++ b/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchema.java @@ -2274,6 +2274,12 @@ public JsonNode toJson(Object message) throws IOException { return JacksonMapper.INSTANCE.readTree(ProtobufSchemaUtils.toJson((Message) message)); } + @Override + public Object copyMessage(Object message) throws IOException { + // Protobuf messages are already immutable + return message; + } + @Override public Object transformMessage(RuleContext ctx, FieldTransform transform, Object message) throws RuleException { From 116325dc34b00a67bf8d95033f60034f4a83b971 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 6 Oct 2023 16:41:25 -0700 Subject: [PATCH 2/5] Fix err message --- .../schemaregistry/encryption/FieldEncryptionExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java b/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java index a130da543a6..04e42dc5c6d 100644 --- a/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java +++ b/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java @@ -145,7 +145,7 @@ public Object preTransformMessage(RuleContext ctx, FieldTransform transform, Obj // We use the target schema message = ctx.target().copyMessage(message); } catch (IOException e) { - throw new RuleException("Could copy source message", e); + throw new RuleException("Could not copy source message", e); } } return message; From bdfdf27b2964f24d25ffb3142db7de77e2bb4ae4 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 6 Oct 2023 17:49:38 -0700 Subject: [PATCH 3/5] Minor refactor --- .../encryption/FieldEncryptionExecutor.java | 32 +------------ .../FieldEncryptionExecutorTest.java | 6 +-- .../rules/FieldRuleExecutor.java | 46 +++++++++++++++---- .../schemaregistry/rules/cel/CelExecutor.java | 4 +- .../rules/cel/CelFieldExecutor.java | 2 +- 5 files changed, 45 insertions(+), 45 deletions(-) diff --git a/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java b/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java index 04e42dc5c6d..c4de1a2ea1d 100644 --- a/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java +++ b/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java @@ -58,7 +58,7 @@ * encrypted data, use the KEK from KMS to decrypt the DEK, and use the decrypted DEK to decrypt * the data. */ -public class FieldEncryptionExecutor implements FieldRuleExecutor { +public class FieldEncryptionExecutor extends FieldRuleExecutor { public static final String TYPE = "ENCRYPT"; @@ -75,7 +75,6 @@ public class FieldEncryptionExecutor implements FieldRuleExecutor { private Map cryptors; private Map configs; - private Boolean preserveSource; private int cacheExpirySecs = -1; private int cacheSize = 10000; private DekRegistryClient client; @@ -83,10 +82,6 @@ public class FieldEncryptionExecutor implements FieldRuleExecutor { public FieldEncryptionExecutor() { } - public boolean isPreserveSource() { - return Boolean.TRUE.equals(preserveSource); - } - @Override public boolean addOriginalConfigs() { return true; @@ -94,11 +89,8 @@ public boolean addOriginalConfigs() { @Override public void configure(Map configs) { + super.configure(configs); this.configs = configs; - Object preserveSourceConfig = configs.get(ENCRYPT_PRESERVE_SOURCE); - if (preserveSourceConfig != null) { - this.preserveSource = Boolean.parseBoolean(preserveSourceConfig.toString()); - } Object cacheExpirySecsConfig = configs.get(CACHE_EXPIRY_SECS); if (cacheExpirySecsConfig != null) { try { @@ -137,20 +129,6 @@ public FieldTransform newTransform(RuleContext ctx) throws RuleException { return transform; } - @Override - public Object preTransformMessage(RuleContext ctx, FieldTransform transform, Object message) - throws RuleException { - if (isPreserveSource()) { - try { - // We use the target schema - message = ctx.target().copyMessage(message); - } catch (IOException e) { - throw new RuleException("Could not copy source message", e); - } - } - return message; - } - private Cryptor getCryptor(RuleContext ctx) { String algorithm = ctx.getParameter(ENCRYPT_DEK_ALGORITHM); DekFormat dekFormat = algorithm != null @@ -225,12 +203,6 @@ public void init(RuleContext ctx) throws RuleException { cryptor = getCryptor(ctx); kekName = getKekName(ctx); kek = getKek(ctx, kekName); - if (FieldEncryptionExecutor.this.preserveSource == null) { - String preserveValueConfig = ctx.getParameter(ENCRYPT_PRESERVE_SOURCE); - if (preserveValueConfig != null) { - FieldEncryptionExecutor.this.preserveSource = Boolean.parseBoolean(preserveValueConfig); - } - } } protected String getKekName(RuleContext ctx) throws RuleException { diff --git a/client-encryption/src/test/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutorTest.java b/client-encryption/src/test/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutorTest.java index 17f5b75ad75..948de68fa60 100644 --- a/client-encryption/src/test/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutorTest.java +++ b/client-encryption/src/test/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutorTest.java @@ -364,7 +364,7 @@ public void testKafkaAvroSerializerPreserveSource() throws Exception { AvroSchema avroSchema = new AvroSchema(createUserSchema()); Rule rule = new Rule("rule1", null, null, null, FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"), - ImmutableMap.of("encrypt.preserve.source", "true"), null, null, null, false); + ImmutableMap.of("rule.preserve.source", "true"), null, null, null, false); RuleSet ruleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule)); Metadata metadata = getMetadata("kek1"); avroSchema = avroSchema.copy(metadata, ruleSet); @@ -490,7 +490,7 @@ public void testKafkaAvroSerializerReflectionPreserveSource() throws Exception { AvroSchema avroSchema = new AvroSchema(schema); Rule rule = new Rule("rule1", null, null, null, FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"), - ImmutableMap.of("encrypt.preserve.source", "true"), null, null, null, false); + ImmutableMap.of("rule.preserve.source", "true"), null, null, null, false); RuleSet ruleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule)); Metadata metadata = getMetadata("kek1"); avroSchema = avroSchema.copy(metadata, ruleSet); @@ -890,7 +890,7 @@ public void testKafkaJsonSchemaSerializerPreserveSource() throws Exception { JsonSchema jsonSchema = new JsonSchema(schemaStr); Rule rule = new Rule("rule1", null, null, null, FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"), - ImmutableMap.of("encrypt.preserve.source", "true"), null, null, null, false); + ImmutableMap.of("rule.preserve.source", "true"), null, null, null, false); RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule)); Metadata metadata = getMetadata("kek1"); jsonSchema = jsonSchema.copy(metadata, ruleSet); diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/rules/FieldRuleExecutor.java b/client/src/main/java/io/confluent/kafka/schemaregistry/rules/FieldRuleExecutor.java index 7a52fe240ff..58107adee0a 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/rules/FieldRuleExecutor.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/rules/FieldRuleExecutor.java @@ -18,6 +18,9 @@ import io.confluent.kafka.schemaregistry.client.rest.entities.Rule; import io.confluent.kafka.schemaregistry.client.rest.entities.RuleKind; +import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode; +import java.io.IOException; +import java.util.Map; import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,13 +28,36 @@ /** * A field-level rule executor. */ -public interface FieldRuleExecutor extends RuleExecutor { +public abstract class FieldRuleExecutor implements RuleExecutor { Logger log = LoggerFactory.getLogger(FieldRuleExecutor.class); - FieldTransform newTransform(RuleContext ctx) throws RuleException; + public static final String RULE_PRESERVE_SOURCE = "rule.preserve.source"; - default Object transform(RuleContext ctx, Object message) throws RuleException { + private Boolean preserveSource; + + @Override + public void configure(Map configs) { + Object preserveSourceConfig = configs.get(RULE_PRESERVE_SOURCE); + if (preserveSourceConfig != null) { + this.preserveSource = Boolean.parseBoolean(preserveSourceConfig.toString()); + } + } + + public boolean isPreserveSource() { + return Boolean.TRUE.equals(preserveSource); + } + + public abstract FieldTransform newTransform(RuleContext ctx) throws RuleException; + + @Override + public Object transform(RuleContext ctx, Object message) throws RuleException { + if (this.preserveSource == null) { + String preserveValueConfig = ctx.getParameter(RULE_PRESERVE_SOURCE); + if (preserveValueConfig != null) { + this.preserveSource = Boolean.parseBoolean(preserveValueConfig); + } + } switch (ctx.ruleMode()) { case WRITE: case UPGRADE: @@ -63,7 +89,14 @@ default Object transform(RuleContext ctx, Object message) throws RuleException { try (FieldTransform transform = newTransform(ctx)) { if (transform != null) { - message = preTransformMessage(ctx, transform, message); + if (ctx.ruleMode() == RuleMode.WRITE && isPreserveSource()) { + try { + // We use the target schema + message = ctx.target().copyMessage(message); + } catch (IOException e) { + throw new RuleException("Could not copy source message", e); + } + } return ctx.target().transformMessage(ctx, transform, message); } else { return message; @@ -71,11 +104,6 @@ default Object transform(RuleContext ctx, Object message) throws RuleException { } } - default Object preTransformMessage(RuleContext ctx, FieldTransform transform, Object message) - throws RuleException { - return message; - } - static boolean areTransformsWithSameTags(Rule rule1, Rule rule2) { return rule1.getTags().size() > 0 && rule1.getKind() == RuleKind.TRANSFORM diff --git a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java index 0e2529dc60b..7faa84a2d06 100644 --- a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java +++ b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java @@ -61,7 +61,7 @@ public class CelExecutor implements RuleExecutor { public static final String TYPE = "CEL"; - public static final String IGNORE_GUARD_SEPARATOR = "ignore.guard.separator"; + public static final String CEL_IGNORE_GUARD_SEPARATOR = "cel.ignore.guard.separator"; private static final ObjectMapper mapper = new ObjectMapper(); @@ -151,7 +151,7 @@ protected Object execute( RuleContext ctx, Object obj, Map args) throws RuleException { String expr = ctx.rule().getExpr(); - String ignoreGuardStr = ctx.getParameter(IGNORE_GUARD_SEPARATOR); + String ignoreGuardStr = ctx.getParameter(CEL_IGNORE_GUARD_SEPARATOR); boolean ignoreGuard = Boolean.parseBoolean(ignoreGuardStr); if (!ignoreGuard) { // An optional guard (followed by semicolon) can precede the expr diff --git a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelFieldExecutor.java b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelFieldExecutor.java index 9963a5c93a5..0a128aae537 100644 --- a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelFieldExecutor.java +++ b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelFieldExecutor.java @@ -29,7 +29,7 @@ import java.util.HashMap; import java.util.Map; -public class CelFieldExecutor implements FieldRuleExecutor { +public class CelFieldExecutor extends FieldRuleExecutor { public static final String TYPE = "CEL_FIELD"; From cc9128eb5793f31362f32c4c3c0dac4e7aa7595b Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 6 Oct 2023 17:52:04 -0700 Subject: [PATCH 4/5] Minor cleanup --- .../kafka/schemaregistry/encryption/FieldEncryptionExecutor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java b/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java index c4de1a2ea1d..c24ae259aa3 100644 --- a/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java +++ b/client-encryption/src/main/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutor.java @@ -66,7 +66,6 @@ public class FieldEncryptionExecutor extends FieldRuleExecutor { public static final String ENCRYPT_KMS_KEY_ID = "encrypt.kms.key.id"; public static final String ENCRYPT_KMS_TYPE = "encrypt.kms.type"; public static final String ENCRYPT_DEK_ALGORITHM = "encrypt.dek.algorithm"; - public static final String ENCRYPT_PRESERVE_SOURCE = "encrypt.preserve.source"; public static final String KMS_TYPE_SUFFIX = "://"; public static final byte[] EMPTY_AAD = new byte[0]; From ee2321eb76e6d8353e3e7efabf6e032bdfd73b6e Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 6 Oct 2023 18:01:20 -0700 Subject: [PATCH 5/5] Minor cleanup --- .../encryption/FieldEncryptionExecutorTest.java | 6 +++--- .../kafka/schemaregistry/rules/FieldRuleExecutor.java | 10 ++++++---- .../schemaregistry/rules/cel/CelExecutorTest.java | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/client-encryption/src/test/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutorTest.java b/client-encryption/src/test/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutorTest.java index 948de68fa60..e3532ebacde 100644 --- a/client-encryption/src/test/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutorTest.java +++ b/client-encryption/src/test/java/io/confluent/kafka/schemaregistry/encryption/FieldEncryptionExecutorTest.java @@ -364,7 +364,7 @@ public void testKafkaAvroSerializerPreserveSource() throws Exception { AvroSchema avroSchema = new AvroSchema(createUserSchema()); Rule rule = new Rule("rule1", null, null, null, FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"), - ImmutableMap.of("rule.preserve.source", "true"), null, null, null, false); + ImmutableMap.of("preserve.source.fields", "true"), null, null, null, false); RuleSet ruleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule)); Metadata metadata = getMetadata("kek1"); avroSchema = avroSchema.copy(metadata, ruleSet); @@ -490,7 +490,7 @@ public void testKafkaAvroSerializerReflectionPreserveSource() throws Exception { AvroSchema avroSchema = new AvroSchema(schema); Rule rule = new Rule("rule1", null, null, null, FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"), - ImmutableMap.of("rule.preserve.source", "true"), null, null, null, false); + ImmutableMap.of("preserve.source.fields", "true"), null, null, null, false); RuleSet ruleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule)); Metadata metadata = getMetadata("kek1"); avroSchema = avroSchema.copy(metadata, ruleSet); @@ -890,7 +890,7 @@ public void testKafkaJsonSchemaSerializerPreserveSource() throws Exception { JsonSchema jsonSchema = new JsonSchema(schemaStr); Rule rule = new Rule("rule1", null, null, null, FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"), - ImmutableMap.of("rule.preserve.source", "true"), null, null, null, false); + ImmutableMap.of("preserve.source.fields", "true"), null, null, null, false); RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule)); Metadata metadata = getMetadata("kek1"); jsonSchema = jsonSchema.copy(metadata, ruleSet); diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/rules/FieldRuleExecutor.java b/client/src/main/java/io/confluent/kafka/schemaregistry/rules/FieldRuleExecutor.java index 58107adee0a..26765643107 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/rules/FieldRuleExecutor.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/rules/FieldRuleExecutor.java @@ -32,13 +32,13 @@ public abstract class FieldRuleExecutor implements RuleExecutor { Logger log = LoggerFactory.getLogger(FieldRuleExecutor.class); - public static final String RULE_PRESERVE_SOURCE = "rule.preserve.source"; + public static final String PRESERVE_SOURCE_FIELDS = "preserve.source.fields"; private Boolean preserveSource; @Override public void configure(Map configs) { - Object preserveSourceConfig = configs.get(RULE_PRESERVE_SOURCE); + Object preserveSourceConfig = configs.get(PRESERVE_SOURCE_FIELDS); if (preserveSourceConfig != null) { this.preserveSource = Boolean.parseBoolean(preserveSourceConfig.toString()); } @@ -53,7 +53,7 @@ public boolean isPreserveSource() { @Override public Object transform(RuleContext ctx, Object message) throws RuleException { if (this.preserveSource == null) { - String preserveValueConfig = ctx.getParameter(RULE_PRESERVE_SOURCE); + String preserveValueConfig = ctx.getParameter(PRESERVE_SOURCE_FIELDS); if (preserveValueConfig != null) { this.preserveSource = Boolean.parseBoolean(preserveValueConfig); } @@ -89,7 +89,9 @@ public Object transform(RuleContext ctx, Object message) throws RuleException { try (FieldTransform transform = newTransform(ctx)) { if (transform != null) { - if (ctx.ruleMode() == RuleMode.WRITE && isPreserveSource()) { + if (ctx.ruleMode() == RuleMode.WRITE + && ctx.rule().getKind() == RuleKind.TRANSFORM + && isPreserveSource()) { try { // We use the target schema message = ctx.target().copyMessage(message); diff --git a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java index 1c0fc57e424..3061f6526b2 100644 --- a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java +++ b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java @@ -728,7 +728,7 @@ public void testKafkaAvroSerializerReflectionFieldTransformIgnoreGuardSeparator( AvroSchema avroSchema = new AvroSchema(schema); Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITE, CelFieldExecutor.TYPE, ImmutableSortedSet.of("PII"), - ImmutableMap.of("ignore.guard.separator", "true"), "value + \"-suffix;\"", + ImmutableMap.of("cel.ignore.guard.separator", "true"), "value + \"-suffix;\"", null, null, false); RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule)); avroSchema = avroSchema.copy(null, ruleSet);