Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Nov 11, 2023
1 parent a990ce0 commit 3e5ecba
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 7 deletions.
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -129,6 +130,7 @@ public CelExecutorTest() {
defaultConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
defaultConfig.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, "false");
defaultConfig.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, "true");
defaultConfig.put(AbstractKafkaSchemaSerDeConfig.LATEST_CACHE_SIZE, "0");
defaultConfig.put(AbstractKafkaSchemaSerDeConfig.RULE_EXECUTORS, "cel,cel-field");
defaultConfig.put(AbstractKafkaSchemaSerDeConfig.RULE_EXECUTORS + ".cel.class",
CelExecutor.class.getName());
Expand Down Expand Up @@ -212,10 +214,14 @@ private Schema createUserSchema() {
}

private IndexedRecord createUserRecord() {
return createUserRecord("testUser");
}

private IndexedRecord createUserRecord(String name) {
Schema enumSchema = createEnumSchema();
Schema schema = createUserSchema();
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "testUser");
avroRecord.put("name", name);
avroRecord.put("mybytes", ByteBuffer.wrap(new byte[] { 0 }));
avroRecord.put("myint", 1);
avroRecord.put("mylong", 2L);
Expand Down Expand Up @@ -938,6 +944,115 @@ public void testKafkaAvroSerializerNullTransform() throws Exception {
avroSerializer.serialize(topic, avroRecord);
}

@Test
public void testKafkaAvroSerializerBuiltinFunctions() throws Exception {
IndexedRecord avroRecord = createUserRecord("bob@confluent.com");
AvroSchema avroSchema = new AvroSchema(avroRecord.getSchema());
Rule rule = new Rule("myRule", null, RuleKind.CONDITION, RuleMode.WRITEREAD,
CelExecutor.TYPE, null, null,
"message.name.isEmail()",
null, null, false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
avroSchema = avroSchema.copy(null, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

byte[] bytes = avroSerializer.serialize(topic, avroRecord);
assertEquals(avroRecord, avroDeserializer.deserialize(topic, bytes));

IndexedRecord avroRecord2 = createUserRecord("bob.@confluent.com");
assertThrows(SerializationException.class, () -> avroSerializer.serialize(topic, avroRecord2));

avroRecord = createUserRecord("localhost");
rule = new Rule("myRule", null, RuleKind.CONDITION, RuleMode.WRITEREAD,
CelExecutor.TYPE, null, null,
"message.name.isHostname()",
null, null, false);
ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
avroSchema = avroSchema.copy(null, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

bytes = avroSerializer.serialize(topic, avroRecord);
assertEquals(avroRecord, avroDeserializer.deserialize(topic, bytes));

IndexedRecord avroRecord3 = createUserRecord("local_host");
assertThrows(SerializationException.class, () -> avroSerializer.serialize(topic, avroRecord3));

avroRecord = createUserRecord("127.0.0.1");
rule = new Rule("myRule", null, RuleKind.CONDITION, RuleMode.WRITEREAD,
CelExecutor.TYPE, null, null,
"message.name.isIpv4()",
null, null, false);
ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
avroSchema = avroSchema.copy(null, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

bytes = avroSerializer.serialize(topic, avroRecord);
assertEquals(avroRecord, avroDeserializer.deserialize(topic, bytes));

IndexedRecord avroRecord4 = createUserRecord("foo");
assertThrows(SerializationException.class, () -> avroSerializer.serialize(topic, avroRecord4));

avroRecord = createUserRecord("2001:db8:85a3:0:0:8a2e:370:7334");
rule = new Rule("myRule", null, RuleKind.CONDITION, RuleMode.WRITEREAD,
CelExecutor.TYPE, null, null,
"message.name.isIpv6()",
null, null, false);
ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
avroSchema = avroSchema.copy(null, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

bytes = avroSerializer.serialize(topic, avroRecord);
assertEquals(avroRecord, avroDeserializer.deserialize(topic, bytes));

IndexedRecord avroRecord5 = createUserRecord("foo");
assertThrows(SerializationException.class, () -> avroSerializer.serialize(topic, avroRecord5));

avroRecord = createUserRecord("http://confluent.com/index.html");
rule = new Rule("myRule", null, RuleKind.CONDITION, RuleMode.WRITEREAD,
CelExecutor.TYPE, null, null,
"message.name.isUri()",
null, null, false);
ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
avroSchema = avroSchema.copy(null, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

bytes = avroSerializer.serialize(topic, avroRecord);
assertEquals(avroRecord, avroDeserializer.deserialize(topic, bytes));

IndexedRecord avroRecord6 = createUserRecord("foo");
assertThrows(SerializationException.class, () -> avroSerializer.serialize(topic, avroRecord6));

avroRecord = createUserRecord("/index.html");
rule = new Rule("myRule", null, RuleKind.CONDITION, RuleMode.WRITEREAD,
CelExecutor.TYPE, null, null,
"message.name.isUriRef()",
null, null, false);
ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
avroSchema = avroSchema.copy(null, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

bytes = avroSerializer.serialize(topic, avroRecord);
assertEquals(avroRecord, avroDeserializer.deserialize(topic, bytes));

IndexedRecord avroRecord7 = createUserRecord("::foo");
assertThrows(SerializationException.class, () -> avroSerializer.serialize(topic, avroRecord7));

avroRecord = createUserRecord("fa02a430-892f-4160-97cd-6e3d1bc14494");
rule = new Rule("myRule", null, RuleKind.CONDITION, RuleMode.WRITEREAD,
CelExecutor.TYPE, null, null,
"message.name.isUuid()",
null, null, false);
ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
avroSchema = avroSchema.copy(null, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

bytes = avroSerializer.serialize(topic, avroRecord);
assertEquals(avroRecord, avroDeserializer.deserialize(topic, bytes));

IndexedRecord avroRecord8 = createUserRecord("::foo");
assertThrows(SerializationException.class, () -> avroSerializer.serialize(topic, avroRecord8));
}

@Test
public void testKafkaProtobufSerializer() throws Exception {
byte[] bytes;
Expand Down
Expand Up @@ -125,9 +125,8 @@ public void pathSuccess() {
}

@Test
public void uuidSuccess() {
System.out.println(UUID.randomUUID());
assertSuccess("fa02a430-892f-4160-97cd-6e3d1bc14494", BuiltinOverload::validateUuid);
public void illegalCharFailure() {
assertFailure("\\\\WINDOWS\\fileshare", BuiltinOverload::validateUriRef);
}

@Test
Expand All @@ -136,10 +135,10 @@ public void uuidFailure() {
}

@Test
public void illegalCharFailure() {
assertFailure("\\\\WINDOWS\\fileshare", BuiltinOverload::validateUriRef);
public void uuidSuccess() {
System.out.println(UUID.randomUUID());
assertSuccess("fa02a430-892f-4160-97cd-6e3d1bc14494", BuiltinOverload::validateUuid);
}

static void assertSuccess(String input, Predicate<String> format) {
assertTrue(format.test(input));
}
Expand Down

0 comments on commit 3e5ecba

Please sign in to comment.