Skip to content

Commit

Permalink
Add check for FieldRuleExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Mar 1, 2024
1 parent bc19445 commit 1063bb4
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
Expand Up @@ -52,6 +52,10 @@ public boolean isPreserveSource() {

@Override
public Object transform(RuleContext ctx, Object message) throws RuleException {
if (ctx.rule().getKind() == RuleKind.CONDITION) {
throw new RuleException(
"Rules of type " + ctx.rule().getType() + " can only have kind TRANSFORM");
}
if (this.preserveSource == null) {
String preserveValueConfig = ctx.getParameter(PRESERVE_SOURCE_FIELDS);
if (preserveValueConfig != null) {
Expand Down
Expand Up @@ -58,6 +58,7 @@
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.schemaregistry.rules.DlqAction;
import io.confluent.kafka.schemaregistry.rules.PiiProto;
import io.confluent.kafka.schemaregistry.rules.RuleException;
import io.confluent.kafka.schemaregistry.rules.WidgetProto.Kind;
import io.confluent.kafka.schemaregistry.rules.WidgetProto.Pii;
import io.confluent.kafka.schemaregistry.rules.WidgetProto.Widget;
Expand Down Expand Up @@ -932,6 +933,34 @@ public void testKafkaAvroSerializerReflectionFieldTransformWithSameTag() throws
assertEquals("678-suffix2", ((OldWidget)obj).getPiiMap().get("key2").getPii());
}

@Test(expected = SerializationException.class)
public void testKafkaAvroSerializerReflectionFieldTransformWithBadKind() throws Exception {
byte[] bytes;
Object obj;

OldWidget widget = new OldWidget("alice");
widget.setLastName("");
widget.setFullName("");
widget.setMyint(1);
widget.setMylong(2L);
widget.setMyfloat(3.0f);
widget.setMydouble(4.0d);
widget.setMyboolean(true);
widget.setSsn(ImmutableList.of("123", "456"));
widget.setPiiArray(ImmutableList.of(new OldPii("789"), new OldPii("012")));
widget.setPiiMap(ImmutableMap.of("key1", new OldPii("345"), "key2", new OldPii("678")));
Schema schema = createWidgetSchema();
AvroSchema avroSchema = new AvroSchema(schema);
Rule rule = new Rule("myRule", null, RuleKind.CONDITION, RuleMode.WRITE,
CelFieldExecutor.TYPE, ImmutableSortedSet.of("PII"), null, "value + \"-suffix2\"",
null, null, false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule));
avroSchema = avroSchema.copy(null, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

bytes = reflectionAvroSerializer.serialize(topic, widget);
}

@Test
public void testKafkaAvroSerializerNewMapTransform() throws Exception {
IndexedRecord avroRecord = createUserRecord();
Expand Down

0 comments on commit 1063bb4

Please sign in to comment.