Skip to content

Commit

Permalink
Merge branch '7.5.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Oct 7, 2023
2 parents 0d8ae16 + baa71ac commit da3ba87
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* encrypted data, use the KEK from KMS to decrypt the DEK, and use the decrypted DEK to decrypt
* the data.
*/
public class FieldEncryptionExecutor implements FieldRuleExecutor {
public class FieldEncryptionExecutor extends FieldRuleExecutor {

public static final String TYPE = "ENCRYPT";

Expand Down Expand Up @@ -88,6 +88,7 @@ public boolean addOriginalConfigs() {

@Override
public void configure(Map<String, ?> configs) {
super.configure(configs);
this.configs = configs;
Object cacheExpirySecsConfig = configs.get(CACHE_EXPIRY_SECS);
if (cacheExpirySecsConfig != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ private Schema createUserSchema() {
return schema;
}

private IndexedRecord createUserRecord() {
private GenericRecord createUserRecord() {
Schema schema = createUserSchema();
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "testUser");
Expand Down Expand Up @@ -358,6 +358,31 @@ public void testKafkaAvroSerializer() throws Exception {
assertEquals("testUser", record.get("name"));
}

@Test
public void testKafkaAvroSerializerPreserveSource() throws Exception {
GenericRecord avroRecord = createUserRecord();
AvroSchema avroSchema = new AvroSchema(createUserSchema());
Rule rule = new Rule("rule1", null, null, null,
FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"),
ImmutableMap.of("preserve.source.fields", "true"), null, null, null, false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule));
Metadata metadata = getMetadata("kek1");
avroSchema = avroSchema.copy(metadata, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

int expectedEncryptions = 1;
RecordHeaders headers = new RecordHeaders();
Cryptor cryptor = addSpyToCryptor(avroSerializer);
byte[] bytes = avroSerializer.serialize(topic, headers, avroRecord);
verify(cryptor, times(expectedEncryptions)).encrypt(any(), any(), any());
cryptor = addSpyToCryptor(avroDeserializer);
GenericRecord record = (GenericRecord) avroDeserializer.deserialize(topic, headers, bytes);
verify(cryptor, times(expectedEncryptions)).decrypt(any(), any(), any());
assertEquals("testUser", record.get("name"));
// Old value is preserved
assertEquals("testUser", avroRecord.get("name"));
}

@Test
public void testKafkaAvroSerializer2() throws Exception {
IndexedRecord avroRecord = createUserRecord();
Expand Down Expand Up @@ -455,6 +480,40 @@ public void testKafkaAvroSerializerReflection() throws Exception {
assertEquals("678", ((OldWidget)obj).getPiiMap().get("key2").getPii());
}

@Test
public void testKafkaAvroSerializerReflectionPreserveSource() throws Exception {
OldWidget widget = new OldWidget("alice");
widget.setSsn(ImmutableList.of("123", "456"));
widget.setPiiArray(ImmutableList.of(new OldPii("789"), new OldPii("012")));
widget.setPiiMap(ImmutableMap.of("key1", new OldPii("345"), "key2", new OldPii("678")));
Schema schema = createWidgetSchema();
AvroSchema avroSchema = new AvroSchema(schema);
Rule rule = new Rule("rule1", null, null, null,
FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"),
ImmutableMap.of("preserve.source.fields", "true"), null, null, null, false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule));
Metadata metadata = getMetadata("kek1");
avroSchema = avroSchema.copy(metadata, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

int expectedEncryptions = 7;
RecordHeaders headers = new RecordHeaders();
Cryptor cryptor = addSpyToCryptor(reflectionAvroSerializer);
byte[] bytes = reflectionAvroSerializer.serialize(topic, headers, widget);
verify(cryptor, times(expectedEncryptions)).encrypt(any(), any(), any());
cryptor = addSpyToCryptor(reflectionAvroDeserializer);
Object obj = reflectionAvroDeserializer.deserialize(topic, headers, bytes);
verify(cryptor, times(expectedEncryptions)).decrypt(any(), any(), any());

assertTrue(
"Returned object should be a Widget",
OldWidget.class.isInstance(obj)
);
assertEquals("alice", ((OldWidget)obj).getName());
// Old value is preserved
assertEquals("alice", widget.getName());
}

@Test
public void testKafkaAvroSerializerMultipleRules() throws Exception {
IndexedRecord avroRecord = createUserRecord();
Expand Down Expand Up @@ -809,6 +868,48 @@ public void testKafkaJsonSchemaSerializer() throws Exception {
);
}

@Test
public void testKafkaJsonSchemaSerializerPreserveSource() throws Exception {
OldWidget widget = new OldWidget("alice");
widget.setSize(123);
widget.setSsn(ImmutableList.of("123", "456"));
widget.setPiiArray(ImmutableList.of(new OldPii("789"), new OldPii("012")));
String schemaStr = "{\"$schema\":\"http://json-schema.org/draft-07/schema#\",\"title\":\"Old Widget\",\"type\":\"object\",\"additionalProperties\":false,\"properties\":{\n"
+ "\"name\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"string\"}],"
+ "\"confluent:tags\": [ \"PII\" ]},"
+ "\"ssn\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"array\",\"items\":{\"type\":\"string\"}}],"
+ "\"confluent:tags\": [ \"PII\" ]},"
+ "\"piiArray\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"array\",\"items\":{\"$ref\":\"#/definitions/OldPii\"}}]},"
+ "\"piiMap\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"object\",\"additionalProperties\":{\"$ref\":\"#/definitions/OldPii\"}}]},"
+ "\"size\":{\"type\":\"integer\"},"
+ "\"version\":{\"type\":\"integer\"}},"
+ "\"required\":[\"size\",\"version\"],"
+ "\"definitions\":{\"OldPii\":{\"type\":\"object\",\"additionalProperties\":false,\"properties\":{"
+ "\"pii\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"string\"}],"
+ "\"confluent:tags\": [ \"PII\" ]}}}}}";
JsonSchema jsonSchema = new JsonSchema(schemaStr);
Rule rule = new Rule("rule1", null, null, null,
FieldEncryptionExecutor.TYPE, ImmutableSortedSet.of("PII"),
ImmutableMap.of("preserve.source.fields", "true"), null, null, null, false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
Metadata metadata = getMetadata("kek1");
jsonSchema = jsonSchema.copy(metadata, ruleSet);
schemaRegistry.register(topic + "-value", jsonSchema);

int expectedEncryptions = 5;
RecordHeaders headers = new RecordHeaders();
Cryptor cryptor = addSpyToCryptor(jsonSchemaSerializer);
byte[] bytes = jsonSchemaSerializer.serialize(topic, headers, widget);
verify(cryptor, times(expectedEncryptions)).encrypt(any(), any(), any());
cryptor = addSpyToCryptor(jsonSchemaDeserializer);
Object obj = jsonSchemaDeserializer.deserialize(topic, headers, bytes);
verify(cryptor, times(expectedEncryptions)).decrypt(any(), any(), any());

assertEquals("alice", ((JsonNode)obj).get("name").textValue());
// Old value is preserved
assertEquals("alice", widget.getName());
}

@Test
public void testKafkaJsonSchemaSerializerAnnotated() throws Exception {
AnnotatedOldWidget widget = new AnnotatedOldWidget("alice");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ default JsonNode toJson(Object object) throws IOException {
throw new UnsupportedOperationException();
}

default Object copyMessage(Object message) throws IOException {
throw new UnsupportedOperationException();
}

default Object transformMessage(RuleContext ctx, FieldTransform transform, Object message)
throws RuleException {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,12 @@ public JsonNode toJson(Object message) throws IOException {
return JacksonMapper.INSTANCE.readTree(AvroSchemaUtils.toJson(message));
}

@Override
public Object copyMessage(Object message) throws IOException {
GenericData data = getData(message);
return data.deepCopy(rawSchema(), message);
}

@Override
public Object transformMessage(RuleContext ctx, FieldTransform transform, Object message)
throws RuleException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class DlqAction implements RuleAction {
public static final String TYPE = "DLQ";

public static final String DLQ_TOPIC = "dlq.topic";
public static final String DLQ_AUTO_FLUSH = "dlq.auto.flush";
public static final String PRODUCER = "producer"; // for testing

public static final String HEADER_PREFIX = "__rule.";
public static final String RULE_NAME = HEADER_PREFIX + "name";
Expand All @@ -53,9 +55,6 @@ public class DlqAction implements RuleAction {
public static final String RULE_TOPIC = HEADER_PREFIX + "topic";
public static final String RULE_EXCEPTION = HEADER_PREFIX + "exception";

public static final String DLQ_AUTO_FLUSH = "dlq.auto.flush";
public static final String PRODUCER = "producer"; // for testing

private static final LongSerializer LONG_SERIALIZER = new LongSerializer();
private static final IntegerSerializer INT_SERIALIZER = new IntegerSerializer();
private static final ShortSerializer SHORT_SERIALIZER = new ShortSerializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,46 @@

import io.confluent.kafka.schemaregistry.client.rest.entities.Rule;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleKind;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A field-level rule executor.
*/
public interface FieldRuleExecutor extends RuleExecutor {
public abstract class FieldRuleExecutor implements RuleExecutor {

Logger log = LoggerFactory.getLogger(FieldRuleExecutor.class);

FieldTransform newTransform(RuleContext ctx) throws RuleException;
public static final String PRESERVE_SOURCE_FIELDS = "preserve.source.fields";

default Object transform(RuleContext ctx, Object message) throws RuleException {
private Boolean preserveSource;

@Override
public void configure(Map<String, ?> configs) {
Object preserveSourceConfig = configs.get(PRESERVE_SOURCE_FIELDS);
if (preserveSourceConfig != null) {
this.preserveSource = Boolean.parseBoolean(preserveSourceConfig.toString());
}
}

public boolean isPreserveSource() {
return Boolean.TRUE.equals(preserveSource);
}

public abstract FieldTransform newTransform(RuleContext ctx) throws RuleException;

@Override
public Object transform(RuleContext ctx, Object message) throws RuleException {
if (this.preserveSource == null) {
String preserveValueConfig = ctx.getParameter(PRESERVE_SOURCE_FIELDS);
if (preserveValueConfig != null) {
this.preserveSource = Boolean.parseBoolean(preserveValueConfig);
}
}
switch (ctx.ruleMode()) {
case WRITE:
case UPGRADE:
Expand Down Expand Up @@ -63,6 +89,16 @@ default Object transform(RuleContext ctx, Object message) throws RuleException {

try (FieldTransform transform = newTransform(ctx)) {
if (transform != null) {
if (ctx.ruleMode() == RuleMode.WRITE
&& ctx.rule().getKind() == RuleKind.TRANSFORM
&& isPreserveSource()) {
try {
// We use the target schema
message = ctx.target().copyMessage(message);
} catch (IOException e) {
throw new RuleException("Could not copy source message", e);
}
}
return ctx.target().transformMessage(ctx, transform, message);
} else {
return message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap;
import io.confluent.kafka.schemaregistry.utils.JacksonMapper;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.HashMap;
Expand Down Expand Up @@ -533,7 +532,15 @@ public JsonNode toJson(Object message) throws IOException {
if (message instanceof JsonNode) {
return (JsonNode) message;
}
return JacksonMapper.INSTANCE.readTree(JsonSchemaUtils.toJson(message));
return objectMapper.readTree(JsonSchemaUtils.toJson(message));
}

@Override
public Object copyMessage(Object message) throws IOException {
if (message instanceof JsonNode) {
return ((JsonNode) message).deepCopy();
}
return toJson(message);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2274,6 +2274,12 @@ public JsonNode toJson(Object message) throws IOException {
return JacksonMapper.INSTANCE.readTree(ProtobufSchemaUtils.toJson((Message) message));
}

@Override
public Object copyMessage(Object message) throws IOException {
// Protobuf messages are already immutable
return message;
}

@Override
public Object transformMessage(RuleContext ctx, FieldTransform transform, Object message)
throws RuleException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class CelExecutor implements RuleExecutor {

public static final String TYPE = "CEL";

public static final String IGNORE_GUARD_SEPARATOR = "ignore.guard.separator";
public static final String CEL_IGNORE_GUARD_SEPARATOR = "cel.ignore.guard.separator";

private static final ObjectMapper mapper = new ObjectMapper();

Expand Down Expand Up @@ -151,7 +151,7 @@ protected Object execute(
RuleContext ctx, Object obj, Map<String, Object> args)
throws RuleException {
String expr = ctx.rule().getExpr();
String ignoreGuardStr = ctx.getParameter(IGNORE_GUARD_SEPARATOR);
String ignoreGuardStr = ctx.getParameter(CEL_IGNORE_GUARD_SEPARATOR);
boolean ignoreGuard = Boolean.parseBoolean(ignoreGuardStr);
if (!ignoreGuard) {
// An optional guard (followed by semicolon) can precede the expr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.HashMap;
import java.util.Map;

public class CelFieldExecutor implements FieldRuleExecutor {
public class CelFieldExecutor extends FieldRuleExecutor {

public static final String TYPE = "CEL_FIELD";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ public void testKafkaAvroSerializerReflectionFieldTransformIgnoreGuardSeparator(
AvroSchema avroSchema = new AvroSchema(schema);
Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITE,
CelFieldExecutor.TYPE, ImmutableSortedSet.of("PII"),
ImmutableMap.of("ignore.guard.separator", "true"), "value + \"-suffix;\"",
ImmutableMap.of("cel.ignore.guard.separator", "true"), "value + \"-suffix;\"",
null, null, false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
avroSchema = avroSchema.copy(null, ruleSet);
Expand Down

0 comments on commit da3ba87

Please sign in to comment.