diff --git a/pom.xml b/pom.xml index b32fcbec7ec..6112c74884b 100644 --- a/pom.xml +++ b/pom.xml @@ -107,6 +107,7 @@ 7.4.3-0 1.21 3.12.0 + 1.7 @@ -138,6 +139,11 @@ gson ${gson.version} + + commons-validator + commons-validator + ${commons.validator.version} + org.apache.commons commons-compress diff --git a/schema-rules/pom.xml b/schema-rules/pom.xml index 58f0c3290db..145ed82da0c 100644 --- a/schema-rules/pom.xml +++ b/schema-rules/pom.xml @@ -45,6 +45,10 @@ org.projectnessie.cel cel-tools + + commons-validator + commons-validator + com.ibm.jsonata4java JSONata4Java diff --git a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java index 7faa84a2d06..3ffbebf99cb 100644 --- a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java +++ b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java @@ -39,6 +39,7 @@ import io.confluent.kafka.schemaregistry.rules.RuleException; import io.confluent.kafka.schemaregistry.rules.RuleExecutor; import io.confluent.kafka.schemaregistry.rules.cel.avro.AvroRegistry; +import io.confluent.kafka.schemaregistry.rules.cel.builtin.BuiltinLibrary; import java.io.IOException; import java.time.Instant; import java.time.ZonedDateTime; @@ -116,6 +117,7 @@ public Script load(RuleWithArgs ruleWithArgs) throws Exception { default: throw new IllegalArgumentException("Unsupported type " + ruleWithArgs.getType()); } + scriptBuilder = scriptBuilder.withLibraries(new BuiltinLibrary()); return scriptBuilder.build(); } }); diff --git a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/builtin/BuiltinDeclarations.java b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/builtin/BuiltinDeclarations.java new file mode 100644 index 00000000000..bf7065726cd --- /dev/null +++ b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/builtin/BuiltinDeclarations.java @@ -0,0 +1,74 @@ +/* + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.rules.cel.builtin; + +import com.google.api.expr.v1alpha1.Decl; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.projectnessie.cel.checker.Decls; + +final class BuiltinDeclarations { + + static List create() { + List decls = new ArrayList<>(); + + decls.add( + Decls.newFunction( + "isEmail", + Decls.newInstanceOverload( + "is_email", Collections.singletonList(Decls.String), Decls.Bool))); + + decls.add( + Decls.newFunction( + "isHostname", + Decls.newInstanceOverload( + "is_hostname", Collections.singletonList(Decls.String), Decls.Bool))); + + decls.add( + Decls.newFunction( + "isIpv4", + Decls.newInstanceOverload( + "is_ipv4", Collections.singletonList(Decls.String), Decls.Bool))); + + decls.add( + Decls.newFunction( + "isIpv6", + Decls.newInstanceOverload( + "is_ipv6", Collections.singletonList(Decls.String), Decls.Bool))); + + decls.add( + Decls.newFunction( + "isUriRef", + Decls.newInstanceOverload( + "is_uri_ref", Collections.singletonList(Decls.String), Decls.Bool))); + + decls.add( + Decls.newFunction( + "isUri", + Decls.newInstanceOverload( + "is_uri", Collections.singletonList(Decls.String), Decls.Bool))); + + decls.add( + Decls.newFunction( + "isUuid", + Decls.newInstanceOverload( + "is_uuid", Collections.singletonList(Decls.String), Decls.Bool))); + + return Collections.unmodifiableList(decls); + } +} diff --git a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/builtin/BuiltinLibrary.java b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/builtin/BuiltinLibrary.java new file mode 100644 index 00000000000..eb3ab77b30b --- /dev/null +++ b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/builtin/BuiltinLibrary.java @@ -0,0 +1,40 @@ +/* + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.rules.cel.builtin; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.projectnessie.cel.EnvOption; +import org.projectnessie.cel.EvalOption; +import org.projectnessie.cel.Library; +import org.projectnessie.cel.ProgramOption; + +public class BuiltinLibrary implements Library { + + @Override + public List getCompileOptions() { + return Collections.singletonList(EnvOption.declarations(BuiltinDeclarations.create())); + } + + @Override + public List getProgramOptions() { + return Arrays.asList( + ProgramOption.evalOptions(EvalOption.OptOptimize), + ProgramOption.functions(BuiltinOverload.create())); + } +} diff --git a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/builtin/BuiltinOverload.java b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/builtin/BuiltinOverload.java new file mode 100644 index 00000000000..a462cfbae65 --- /dev/null +++ b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/builtin/BuiltinOverload.java @@ -0,0 +1,179 @@ +/* + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.rules.cel.builtin; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.UUID; +import org.apache.commons.validator.routines.DomainValidator; +import org.apache.commons.validator.routines.EmailValidator; +import org.apache.commons.validator.routines.InetAddressValidator; +import org.projectnessie.cel.common.types.BoolT; +import org.projectnessie.cel.common.types.Err; +import org.projectnessie.cel.common.types.Types; +import org.projectnessie.cel.common.types.ref.TypeEnum; +import org.projectnessie.cel.interpreter.functions.Overload; + +final class BuiltinOverload { + + private static final String OVERLOAD_IS_EMAIL = "isEmail"; + private static final String OVERLOAD_IS_HOSTNAME = "isHostname"; + private static final String OVERLOAD_IS_IPV4 = "isIpv4"; + private static final String OVERLOAD_IS_IPV6 = "isIpv6"; + private static final String OVERLOAD_IS_URI = "isUri"; + private static final String OVERLOAD_IS_URI_REF = "isUriRef"; + private static final String OVERLOAD_IS_UUID = "isUuid"; + + static Overload[] create() { + return new Overload[] { + isEmail(), + isHostname(), + isIpv4(), + isIpv6(), + isUri(), + isUriRef(), + isUuid(), + }; + } + + private static Overload isEmail() { + return Overload.unary( + OVERLOAD_IS_EMAIL, + value -> { + if (value.type().typeEnum() != TypeEnum.String) { + return Err.noSuchOverload(value, OVERLOAD_IS_EMAIL, null); + } + String input = (String) value.value(); + return input.isEmpty() ? BoolT.False : Types.boolOf(validateEmail(input)); + }); + } + + private static Overload isHostname() { + return Overload.unary( + OVERLOAD_IS_HOSTNAME, + value -> { + if (value.type().typeEnum() != TypeEnum.String) { + return Err.noSuchOverload(value, OVERLOAD_IS_HOSTNAME, null); + } + String input = (String) value.value(); + return input.isEmpty() ? BoolT.False : Types.boolOf(validateHostname(input)); + }); + } + + private static Overload isIpv4() { + return Overload.unary( + OVERLOAD_IS_IPV4, + value -> { + if (value.type().typeEnum() != TypeEnum.String) { + return Err.noSuchOverload(value, OVERLOAD_IS_IPV4, null); + } + String input = (String) value.value(); + return input.isEmpty() ? BoolT.False : Types.boolOf(validateIpv4(input)); + }); + } + + private static Overload isIpv6() { + return Overload.unary( + OVERLOAD_IS_IPV6, + value -> { + if (value.type().typeEnum() != TypeEnum.String) { + return Err.noSuchOverload(value, OVERLOAD_IS_IPV6, null); + } + String input = (String) value.value(); + return input.isEmpty() ? BoolT.False : Types.boolOf(validateIpv6(input)); + }); + } + + private static Overload isUri() { + return Overload.unary( + OVERLOAD_IS_URI, + value -> { + if (value.type().typeEnum() != TypeEnum.String) { + return Err.noSuchOverload(value, OVERLOAD_IS_URI, null); + } + String input = (String) value.value(); + return input.isEmpty() ? BoolT.False : Types.boolOf(validateUri(input)); + }); + } + + private static Overload isUriRef() { + return Overload.unary( + OVERLOAD_IS_URI_REF, + value -> { + if (value.type().typeEnum() != TypeEnum.String) { + return Err.noSuchOverload(value, OVERLOAD_IS_URI_REF, null); + } + String input = (String) value.value(); + return input.isEmpty() ? BoolT.False : Types.boolOf(validateUriRef(input)); + }); + } + + private static Overload isUuid() { + return Overload.unary( + OVERLOAD_IS_UUID, + value -> { + if (value.type().typeEnum() != TypeEnum.String) { + return Err.noSuchOverload(value, OVERLOAD_IS_UUID, null); + } + String input = (String) value.value(); + return input.isEmpty() ? BoolT.False : Types.boolOf(validateUuid(input)); + }); + } + + protected static boolean validateEmail(String input) { + return EmailValidator.getInstance(false, true).isValid(input); + } + + protected static boolean validateHostname(String input) { + return DomainValidator.getInstance(true).isValid(input) && !input.contains("_"); + } + + protected static boolean validateIpv4(String input) { + return InetAddressValidator.getInstance().isValidInet4Address(input); + } + + protected static boolean validateIpv6(String input) { + return InetAddressValidator.getInstance().isValidInet6Address(input); + } + + protected static boolean validateUri(String input) { + try { + URI uri = new URI(input); + return uri.isAbsolute(); + } catch (URISyntaxException e) { + return false; + } + } + + protected static boolean validateUriRef(String input) { + try { + new URI(input); + return true; + } catch (URISyntaxException e) { + return false; + } + } + + protected static boolean validateUuid(String input) { + try { + UUID.fromString(input); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } +} \ No newline at end of file diff --git a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/builtin/BuiltinLibraryTest.java b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/builtin/BuiltinLibraryTest.java new file mode 100644 index 00000000000..3e39111652c --- /dev/null +++ b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/builtin/BuiltinLibraryTest.java @@ -0,0 +1,150 @@ +/* + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.rules.cel.builtin; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.UUID; +import java.util.function.Predicate; +import org.junit.Test; + +public class BuiltinLibraryTest { + + private static final String THERE_IS_NO_PLACE_LIKE = "127.0.0.1"; + + private static final String IPV6_ADDR = "2001:db8:85a3:0:0:8a2e:370:7334"; + + @Test + public void emailFailure() { + assertFailure("a.@b.com", BuiltinOverload::validateEmail); + } + + @Test + public void emailSuccess() { + assertSuccess("a@b.com", BuiltinOverload::validateEmail); + } + + @Test + public void hostnameLengthFailure() { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 256; ++i) { + sb.append('a'); + } + String subject = sb.toString(); + assertFailure(subject, BuiltinOverload::validateHostname); + } + + @Test + public void hostnameSuccess() { + assertSuccess("localhost", BuiltinOverload::validateHostname); + } + + @Test + public void hostnameWithUnderscoresFailure() { + assertFailure("not_a_valid_host_name", BuiltinOverload::validateHostname); + } + + @Test + public void ipv4Failure() { + assertFailure("asd", BuiltinOverload::validateIpv4); + } + + @Test + public void ipv4LengthFailure() { + assertFailure(IPV6_ADDR, BuiltinOverload::validateIpv4); + } + + @Test + public void ipv4Success() { + assertSuccess(THERE_IS_NO_PLACE_LIKE, BuiltinOverload::validateIpv4); + } + + @Test + public void ipv6Failure() { + assertFailure("asd", BuiltinOverload::validateIpv6); + } + + @Test + public void ipv6LengthFailure() { + assertFailure(THERE_IS_NO_PLACE_LIKE, BuiltinOverload::validateIpv6); + } + + @Test + public void ipv6Success() { + assertSuccess(IPV6_ADDR, BuiltinOverload::validateIpv6); + } + + @Test + public void uriFailure() { + assertFailure("12 34", BuiltinOverload::validateUri); + } + + @Test + public void relativeUriFails() { + assertFailure("//example.com", BuiltinOverload::validateUri); + } + + @Test + public void relativeUriRefFails() { + assertFailure("abc", BuiltinOverload::validateUri); + } + + @Test + public void uriSuccess() { + assertSuccess("http://example.org:8080/example.html", BuiltinOverload::validateUri); + } + + @Test + public void uriRefSuccess() { + assertSuccess("http://foo.bar/?baz=qux#quux", BuiltinOverload::validateUriRef); + } + + @Test + public void relativeUriRefSuccess() { + assertSuccess("//foo.bar/?baz=qux#quux", BuiltinOverload::validateUriRef); + } + + @Test + public void pathSuccess() { + assertSuccess("/abc", BuiltinOverload::validateUriRef); + } + + @Test + public void uuidSuccess() { + System.out.println(UUID.randomUUID()); + assertSuccess("fa02a430-892f-4160-97cd-6e3d1bc14494", BuiltinOverload::validateUuid); + } + + @Test + public void uuidFailure() { + assertFailure("97cd-6e3d1bc14494", BuiltinOverload::validateUuid); + } + + @Test + public void illegalCharFailure() { + assertFailure("\\\\WINDOWS\\fileshare", BuiltinOverload::validateUriRef); + } + + static void assertSuccess(String input, Predicate format) { + assertTrue(format.test(input)); + } + + static void assertFailure(String input, Predicate format) { + assertFalse(format.test(input)); + } +}