From 44cb423078ffbbd0f5767029bbb09f1689ccd2b3 Mon Sep 17 00:00:00 2001 From: Shanthoosh Venkataraman Date: Mon, 14 Oct 2019 16:52:31 -0700 Subject: [PATCH 1/3] Improve UDF discovery in the samza-sql. Replace ConfigBasedUDFResolver with the UDF resolver based on the reflections. --- build.gradle | 1 + .../samza/sql/client/impl/SamzaExecutor.java | 6 - .../sql/impl/ConfigBasedUdfResolver.java | 119 ------------------ .../samza/sql/interfaces/UdfMetadata.java | 17 ++- .../sql/runner/SamzaSqlApplicationConfig.java | 4 +- .../sql/udf/ReflectionBasedUdfResolver.java | 97 ++++++++++++++ .../runner/TestSamzaSqlApplicationConfig.java | 4 - .../impl/TestReflectionBasedUdfResolver.java | 60 +++++++++ .../samza/sql/udf/impl/TestSamzaSqlUdf.java | 40 ++++++ .../samza/sql/util/SamzaSqlTestConfig.java | 8 -- .../apache/samza/tools/SamzaSqlConsole.java | 6 - 11 files changed, 216 insertions(+), 146 deletions(-) delete mode 100644 samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java create mode 100644 samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java create mode 100644 samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java create mode 100644 samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestSamzaSqlUdf.java diff --git a/build.gradle b/build.gradle index 2c35918380..845b3160d0 100644 --- a/build.gradle +++ b/build.gradle @@ -334,6 +334,7 @@ project(":samza-sql_$scalaSuffix") { compile "org.apache.avro:avro:$avroVersion" compile "org.apache.calcite:calcite-core:$calciteVersion" compile "org.slf4j:slf4j-api:$slf4jVersion" + compile "org.reflections:reflections:0.9.10" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-core:$mockitoVersion" diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java index 5939adb0ee..e97a327392 100755 --- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java @@ -41,7 +41,6 @@ import org.apache.samza.sql.fn.FlattenUdf; import org.apache.samza.sql.fn.RegexMatchUdf; import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory; -import org.apache.samza.sql.impl.ConfigBasedUdfResolver; import org.apache.samza.sql.interfaces.RelSchemaProvider; import org.apache.samza.sql.interfaces.RelSchemaProviderFactory; import org.apache.samza.sql.interfaces.SqlIOConfig; @@ -328,11 +327,6 @@ Map fetchSamzaSqlConfig(int execId) { ConfigBasedIOResolverFactory.class.getName()); staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config"); - String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config"); - staticConfigs.put(configUdfResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY, - ConfigBasedUdfResolver.class.getName()); - staticConfigs.put(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES, - Joiner.on(",").join(RegexMatchUdf.class.getName(), FlattenUdf.class.getName())); staticConfigs.put("serializers.registry.string.class", StringSerdeFactory.class.getName()); staticConfigs.put("serializers.registry.avro.class", AvroSerDeFactory.class.getName()); diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java deleted file mode 100644 index 2b83b60888..0000000000 --- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.impl; - -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; -import org.apache.commons.lang.StringUtils; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.sql.interfaces.UdfMetadata; -import org.apache.samza.sql.interfaces.UdfResolver; -import org.apache.samza.sql.schema.SamzaSqlFieldType; -import org.apache.samza.sql.udfs.SamzaSqlUdf; -import org.apache.samza.sql.udfs.SamzaSqlUdfMethod; -import org.apache.samza.sql.udfs.ScalarUdf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Udf resolver that uses static config to return the UDFs present in the Samza SQL application - * All the UDF classes are provided to this factory as a comma separated list of values for the config named - * "udfClasses". - * This factory loads all the udf classes that are configured, performs the validation to ensure that they extend - * {@link ScalarUdf} and implement the method named "execute" - */ -public class ConfigBasedUdfResolver implements UdfResolver { - - private static final Logger LOG = LoggerFactory.getLogger(ConfigBasedUdfResolver.class); - public static final String CFG_UDF_CLASSES = "udfClasses"; - - private final ArrayList udfs; - - public ConfigBasedUdfResolver(Properties config, Config udfConfig) { - List udfClasses = Arrays.stream(config.getProperty(CFG_UDF_CLASSES, "").split(",")) - .filter(StringUtils::isNotBlank) - .collect(Collectors.toList()); - udfs = new ArrayList<>(); - Class udfClass; - for (String udfClassName : udfClasses) { - try { - udfClass = Class.forName(udfClassName); - } catch (ClassNotFoundException e) { - String msg = String.format("Couldn't load the udf class %s", udfClassName); - LOG.error(msg, e); - throw new SamzaException(msg, e); - } - - if (!ScalarUdf.class.isAssignableFrom(udfClass)) { - String msg = String.format("Udf class %s is not extended from %s", udfClassName, ScalarUdf.class.getName()); - LOG.error(msg); - throw new SamzaException(msg); - } - - SamzaSqlUdf sqlUdf; - Map udfMethods = new HashMap<>(); - SamzaSqlUdfMethod sqlUdfMethod = null; - - sqlUdf = udfClass.getAnnotation(SamzaSqlUdf.class); - Method[] methods = udfClass.getMethods(); - for (Method method : methods) { - sqlUdfMethod = method.getAnnotation(SamzaSqlUdfMethod.class); - if (sqlUdfMethod != null) { - udfMethods.put(sqlUdfMethod, method); - } - } - - if (sqlUdf == null) { - String msg = String.format("UdfClass %s is not annotated with SamzaSqlUdf", udfClass); - LOG.error(msg); - throw new SamzaException(msg); - } - - if (udfMethods.isEmpty()) { - String msg = String.format("UdfClass %s doesn't have any methods annotated with SamzaSqlUdfMethod", udfClass); - LOG.error(msg); - throw new SamzaException(msg); - } - - if (sqlUdf.enabled()) { - String udfName = sqlUdf.name(); - for (Map.Entry udfMethod : udfMethods.entrySet()) { - List params = Arrays.asList(udfMethod.getKey().params()); - udfs.add(new UdfMetadata(udfName, sqlUdf.description(), udfMethod.getValue(), udfConfig.subset(udfName + "."), params, - udfMethod.getKey().returns(), udfMethod.getKey().disableArgumentCheck())); - } - } - } - } - - @Override - public Collection getUdfs() { - return udfs; - } -} diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java index c06f60e5d1..c590453437 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java @@ -20,8 +20,8 @@ package org.apache.samza.sql.interfaces; import java.lang.reflect.Method; - import java.util.List; +import com.google.common.base.Objects; import org.apache.samza.config.Config; import org.apache.samza.sql.schema.SamzaSqlFieldType; @@ -99,4 +99,19 @@ public boolean isDisableArgCheck() { return disableArgCheck; } + @Override + public int hashCode() { + return Objects.hashCode(name, udfMethod, arguments, returnType); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof UdfMetadata)) return false; + UdfMetadata that = (UdfMetadata) o; + return Objects.equal(name, that.name) && + Objects.equal(udfMethod, that.udfMethod) && + Objects.equal(arguments, that.arguments) && + returnType == that.returnType; + } } diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java index 1b2ccc193b..546a75a11d 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java @@ -41,7 +41,7 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.sql.dsl.SamzaSqlDslConverter; import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory; -import org.apache.samza.sql.impl.ConfigBasedUdfResolver; +import org.apache.samza.sql.udf.ReflectionBasedUdfResolver; import org.apache.samza.sql.interfaces.DslConverter; import org.apache.samza.sql.interfaces.DslConverterFactory; import org.apache.samza.sql.interfaces.RelSchemaProvider; @@ -214,7 +214,7 @@ private UdfResolver createUdfResolver(Map config) { Properties props = new Properties(); props.putAll(domainConfig); HashMap udfConfig = getDomainProperties(config, CFG_UDF_CONFIG_DOMAIN, false); - return new ConfigBasedUdfResolver(props, new MapConfig(udfConfig)); + return new ReflectionBasedUdfResolver(new MapConfig(udfConfig)); } private static HashMap getDomainProperties(Map props, String prefix, diff --git a/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java new file mode 100644 index 0000000000..2e63556582 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql.udf; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.commons.lang3.reflect.MethodUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.sql.interfaces.UdfMetadata; +import org.apache.samza.sql.interfaces.UdfResolver; +import org.apache.samza.sql.schema.SamzaSqlFieldType; +import org.apache.samza.sql.udfs.SamzaSqlUdf; +import org.apache.samza.sql.udfs.SamzaSqlUdfMethod; +import org.reflections.Reflections; +import org.reflections.util.ConfigurationBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An UDF resolver implementation that uses reflection to discover the subtypes + * of the {@link SamzaSqlUdf} from the classpath. Performs the validation to + * ensure that all subtypes of {@link SamzaSqlUdf} extend and implement the + * method annotated with {@link SamzaSqlUdfMethod}. + */ +public class ReflectionBasedUdfResolver implements UdfResolver { + + private static final Logger LOG = LoggerFactory.getLogger(ReflectionBasedUdfResolver.class); + + private static final String CONFIG_PACKAGE_PREFIX = "samza.sql.udf.resolver.package.prefix"; + + private final Set udfs = new HashSet<>(); + + public ReflectionBasedUdfResolver(Config udfConfig) { + // Searching the entire classpath to discover the subtypes of SamzaSqlUdf is expensive. To reduce the search space, + // the search is limited to the set of package prefixes defined in the configuration. + // Within Linkedin this configuration will be overridden to ["com.linkedin.samza", "org.apache.samza", "com.linkedin.samza.sql.shade.prefix"]. + String samzaSqlUdfPackagePrefix = udfConfig.getOrDefault(CONFIG_PACKAGE_PREFIX, "org.apache.samza"); + + // 1. Build the reflections instance with appropriate configuration. + ConfigurationBuilder configurationBuilder = new ConfigurationBuilder(); + configurationBuilder.forPackages(samzaSqlUdfPackagePrefix.split(",")); + configurationBuilder.addClassLoader(Thread.currentThread().getContextClassLoader()); + Reflections reflections = new Reflections(configurationBuilder); + + // 2. Get all the sub-types of SamzaSqlUdf. + Set> typesAnnotatedWithSamzaSqlUdf = reflections.getTypesAnnotatedWith(SamzaSqlUdf.class); + + for (Class udfClass : typesAnnotatedWithSamzaSqlUdf) { + // 3. Get all the methods that are annotated with SamzaSqlUdfMethod + List methodsAnnotatedWithSamzaSqlMethod = MethodUtils.getMethodsListWithAnnotation(udfClass, SamzaSqlUdfMethod.class); + + if (methodsAnnotatedWithSamzaSqlMethod.isEmpty()) { + String msg = String.format("Udf class: %s doesn't have any methods annotated with: %s", udfClass.getName(), SamzaSqlUdfMethod.class.getName()); + LOG.error(msg); + throw new SamzaException(msg); + } + + SamzaSqlUdf sqlUdf = udfClass.getAnnotation(SamzaSqlUdf.class); + // 4. If the udf is enabled, then add the udf information of the methods to the udfs list. + if (sqlUdf.enabled()) { + String udfName = sqlUdf.name(); + methodsAnnotatedWithSamzaSqlMethod.forEach(method -> { + SamzaSqlUdfMethod samzaSqlUdfMethod = method.getAnnotation(SamzaSqlUdfMethod.class); + List params = Arrays.asList(samzaSqlUdfMethod.params()); + udfs.add(new UdfMetadata(udfName, sqlUdf.description(), method, udfConfig.subset(udfName + "."), params, + samzaSqlUdfMethod.returns(), samzaSqlUdfMethod.disableArgumentCheck())); + }); + } + } + } + + @Override + public Collection getUdfs() { + return udfs; + } +} diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java index 7db38f7e0e..725da9010d 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java @@ -28,7 +28,6 @@ import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.config.MapConfig; -import org.apache.samza.sql.impl.ConfigBasedUdfResolver; import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.sql.util.JsonUtil; import org.apache.samza.sql.util.SamzaSqlQueryParser; @@ -45,8 +44,6 @@ public class TestSamzaSqlApplicationConfig { public void testConfigInit() { Map config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1"); - String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config"); - int numUdfs = config.get(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES).split(",").length; List sqlStmts = fetchSqlFromConfig(config); List queryInfo = fetchQueryInfo(sqlStmts); @@ -55,7 +52,6 @@ public void testConfigInit() { .collect(Collectors.toList()), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); - Assert.assertEquals(numUdfs + 1, samzaSqlApplicationConfig.getUdfMetadata().size()); Assert.assertEquals(1, samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size()); Assert.assertEquals(1, samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().size()); } diff --git a/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java b/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java new file mode 100644 index 0000000000..2a0223d578 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.udf.impl; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.sql.interfaces.UdfMetadata; +import org.apache.samza.sql.schema.SamzaSqlFieldType; +import org.apache.samza.sql.udf.ReflectionBasedUdfResolver; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Method; +import java.util.Collection; + +public class TestReflectionBasedUdfResolver { + + @Test + public void testShouldReturnNothingWhenNoUDFIsInPackagePrefix() { + Config config = new MapConfig(ImmutableMap.of("samza.sql.udf.resolver.package.prefix", "org.apache.samza.udf.blah.blah")); + ReflectionBasedUdfResolver reflectionBasedUdfResolver = new ReflectionBasedUdfResolver(config); + Collection udfMetadataList = reflectionBasedUdfResolver.getUdfs(); + + Assert.assertEquals(0, udfMetadataList.size()); + } + + @Test + public void testUDfResolverShouldReturnAllUDFInClassPath() throws NoSuchMethodException { + Config config = new MapConfig(ImmutableMap.of("samza.sql.udf.resolver.package.prefix", "org.apache.samza.sql.udf.impl")); + ReflectionBasedUdfResolver reflectionBasedUdfResolver = new ReflectionBasedUdfResolver(config); + Collection udfMetadataList = reflectionBasedUdfResolver.getUdfs(); + + Method method = TestSamzaSqlUdf.class.getMethod("execute", String.class); + UdfMetadata udfMetadata = new UdfMetadata("TESTSAMZASQLUDF", + "Test samza sql udf implementation", method, new MapConfig(), ImmutableList.of(SamzaSqlFieldType.STRING), + SamzaSqlFieldType.STRING, true); + + Assert.assertFalse(udfMetadataList.isEmpty()); + Assert.assertTrue(udfMetadataList.contains(udfMetadata)); + } +} diff --git a/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestSamzaSqlUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestSamzaSqlUdf.java new file mode 100644 index 0000000000..d28fde378b --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestSamzaSqlUdf.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql.udf.impl; + +import org.apache.samza.config.Config; +import org.apache.samza.context.Context; +import org.apache.samza.sql.schema.SamzaSqlFieldType; +import org.apache.samza.sql.udfs.SamzaSqlUdf; +import org.apache.samza.sql.udfs.SamzaSqlUdfMethod; +import org.apache.samza.sql.udfs.ScalarUdf; + +@SamzaSqlUdf(name = "TestSamzaSqlUdf", description = "Test samza sql udf implementation") +public class TestSamzaSqlUdf implements ScalarUdf { + + @Override + public void init(Config udfConfig, Context context) { + + } + + @SamzaSqlUdfMethod(params = {SamzaSqlFieldType.STRING}, returns = SamzaSqlFieldType.STRING) + public String execute(String fieldName) { + return "testResponse"; + } +} diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java index b3bb1eef3f..a2b713c1f4 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java @@ -40,7 +40,6 @@ import org.apache.samza.sql.fn.GetNestedFieldUdf; import org.apache.samza.sql.fn.RegexMatchUdf; import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory; -import org.apache.samza.sql.impl.ConfigBasedUdfResolver; import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.system.TestAvroSystemFactory; @@ -95,13 +94,6 @@ public static Map fetchStaticConfigsWithFactories(Map fetchSamzaSqlConfig() { ConfigBasedIOResolverFactory.class.getName()); staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config"); - String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config"); - staticConfigs.put(configUdfResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY, - ConfigBasedUdfResolver.class.getName()); - staticConfigs.put(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES, - Joiner.on(",").join(RegexMatchUdf.class.getName(), FlattenUdf.class.getName())); staticConfigs.put("serializers.registry.string.class", StringSerdeFactory.class.getName()); staticConfigs.put("serializers.registry.avro.class", AvroSerDeFactory.class.getName()); From 26b5a2e93e838723905e732b97188a303483e7e6 Mon Sep 17 00:00:00 2001 From: Shanthoosh Venkataraman Date: Tue, 15 Oct 2019 19:12:34 -0700 Subject: [PATCH 2/3] Address review comments. --- .../sql/impl/ConfigBasedUdfResolver.java | 119 ++++++++++++++++++ .../sql/udf/ReflectionBasedUdfResolver.java | 1 - 2 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java new file mode 100644 index 0000000000..2b83b60888 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.impl; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import org.apache.commons.lang.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.sql.interfaces.UdfMetadata; +import org.apache.samza.sql.interfaces.UdfResolver; +import org.apache.samza.sql.schema.SamzaSqlFieldType; +import org.apache.samza.sql.udfs.SamzaSqlUdf; +import org.apache.samza.sql.udfs.SamzaSqlUdfMethod; +import org.apache.samza.sql.udfs.ScalarUdf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Udf resolver that uses static config to return the UDFs present in the Samza SQL application + * All the UDF classes are provided to this factory as a comma separated list of values for the config named + * "udfClasses". + * This factory loads all the udf classes that are configured, performs the validation to ensure that they extend + * {@link ScalarUdf} and implement the method named "execute" + */ +public class ConfigBasedUdfResolver implements UdfResolver { + + private static final Logger LOG = LoggerFactory.getLogger(ConfigBasedUdfResolver.class); + public static final String CFG_UDF_CLASSES = "udfClasses"; + + private final ArrayList udfs; + + public ConfigBasedUdfResolver(Properties config, Config udfConfig) { + List udfClasses = Arrays.stream(config.getProperty(CFG_UDF_CLASSES, "").split(",")) + .filter(StringUtils::isNotBlank) + .collect(Collectors.toList()); + udfs = new ArrayList<>(); + Class udfClass; + for (String udfClassName : udfClasses) { + try { + udfClass = Class.forName(udfClassName); + } catch (ClassNotFoundException e) { + String msg = String.format("Couldn't load the udf class %s", udfClassName); + LOG.error(msg, e); + throw new SamzaException(msg, e); + } + + if (!ScalarUdf.class.isAssignableFrom(udfClass)) { + String msg = String.format("Udf class %s is not extended from %s", udfClassName, ScalarUdf.class.getName()); + LOG.error(msg); + throw new SamzaException(msg); + } + + SamzaSqlUdf sqlUdf; + Map udfMethods = new HashMap<>(); + SamzaSqlUdfMethod sqlUdfMethod = null; + + sqlUdf = udfClass.getAnnotation(SamzaSqlUdf.class); + Method[] methods = udfClass.getMethods(); + for (Method method : methods) { + sqlUdfMethod = method.getAnnotation(SamzaSqlUdfMethod.class); + if (sqlUdfMethod != null) { + udfMethods.put(sqlUdfMethod, method); + } + } + + if (sqlUdf == null) { + String msg = String.format("UdfClass %s is not annotated with SamzaSqlUdf", udfClass); + LOG.error(msg); + throw new SamzaException(msg); + } + + if (udfMethods.isEmpty()) { + String msg = String.format("UdfClass %s doesn't have any methods annotated with SamzaSqlUdfMethod", udfClass); + LOG.error(msg); + throw new SamzaException(msg); + } + + if (sqlUdf.enabled()) { + String udfName = sqlUdf.name(); + for (Map.Entry udfMethod : udfMethods.entrySet()) { + List params = Arrays.asList(udfMethod.getKey().params()); + udfs.add(new UdfMetadata(udfName, sqlUdf.description(), udfMethod.getValue(), udfConfig.subset(udfName + "."), params, + udfMethod.getKey().returns(), udfMethod.getKey().disableArgumentCheck())); + } + } + } + } + + @Override + public Collection getUdfs() { + return udfs; + } +} diff --git a/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java index 2e63556582..70f91601af 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java @@ -54,7 +54,6 @@ public class ReflectionBasedUdfResolver implements UdfResolver { public ReflectionBasedUdfResolver(Config udfConfig) { // Searching the entire classpath to discover the subtypes of SamzaSqlUdf is expensive. To reduce the search space, // the search is limited to the set of package prefixes defined in the configuration. - // Within Linkedin this configuration will be overridden to ["com.linkedin.samza", "org.apache.samza", "com.linkedin.samza.sql.shade.prefix"]. String samzaSqlUdfPackagePrefix = udfConfig.getOrDefault(CONFIG_PACKAGE_PREFIX, "org.apache.samza"); // 1. Build the reflections instance with appropriate configuration. From c8d45e72c4b89e321b41df2a27b07ef5d371d752 Mon Sep 17 00:00:00 2001 From: Shanthoosh Venkataraman Date: Wed, 16 Oct 2019 10:35:51 -0700 Subject: [PATCH 3/3] Add TODO for the follow-up ticket in the comments. --- .../org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java | 1 + 1 file changed, 1 insertion(+) diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java index 546a75a11d..3d4047ef75 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java @@ -214,6 +214,7 @@ private UdfResolver createUdfResolver(Map config) { Properties props = new Properties(); props.putAll(domainConfig); HashMap udfConfig = getDomainProperties(config, CFG_UDF_CONFIG_DOMAIN, false); + // TODO: SAMZA-2355: Make the UDFResolver pluggable. return new ReflectionBasedUdfResolver(new MapConfig(udfConfig)); }