diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 531f8adbb585f..4987a32c1cd85 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -64,7 +64,7 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.utils.Reflections; -import org.apache.pulsar.functions.worker.Utils; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.WorkerServer; diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 5861e22fa04bd..e44562aac0257 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -32,6 +32,8 @@ import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction; import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions; import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction; +import org.apache.pulsar.admin.cli.CmdSinks.CreateSink; +import org.apache.pulsar.admin.cli.CmdSources.CreateSource; import org.apache.pulsar.client.admin.Functions; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -39,8 +41,10 @@ import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.io.core.RecordContext; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -72,6 +76,7 @@ import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; /** * Unit test of {@link CmdFunctions}. @@ -91,6 +96,8 @@ public IObjectFactory getObjectFactory() { private PulsarAdmin admin; private Functions functions; private CmdFunctions cmd; + private CmdSinks cmdSinks; + private CmdSources cmdSources; public static class DummyFunction implements Function { @@ -103,7 +110,7 @@ public String process(String input, Context context) throws Exception { return null; } } - + private String generateCustomSerdeInputs(String topic, String serde) { Map map = new HashMap<>(); map.put(topic, serde); @@ -118,6 +125,8 @@ public void setup() throws Exception { when(admin.getServiceUrl()).thenReturn("http://localhost:1234"); when(admin.getClientConfigData()).thenReturn(new ClientConfigurationData()); this.cmd = new CmdFunctions(admin); + this.cmdSinks = new CmdSinks(admin); + this.cmdSources = new CmdSources(admin); // mock reflections mockStatic(Reflections.class); @@ -204,7 +213,122 @@ public void testCreateFunction() throws Exception { verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString()); } + + @Test + public void testCreateFunctionWithHttpUrl() throws Exception { + String fnName = TEST_NAME + "-function"; + String inputTopicName = TEST_NAME + "-input-topic"; + String outputTopicName = TEST_NAME + "-output-topic"; + + ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer(); + consoleOutputCapturer.start(); + + final String url = "http://localhost:1234/test"; + cmd.run(new String[] { + "create", + "--name", fnName, + "--inputs", inputTopicName, + "--output", outputTopicName, + "--jar", url, + "--tenant", "sample", + "--namespace", "ns1", + "--className", DummyFunction.class.getName(), + }); + + CreateFunction creater = cmd.getCreater(); + + consoleOutputCapturer.stop(); + String output = consoleOutputCapturer.getStderr(); + + assertTrue(output.contains("Failed to download jar")); + assertEquals(fnName, creater.getFunctionName()); + assertEquals(inputTopicName, creater.getInputs()); + assertEquals(outputTopicName, creater.getOutput()); + } + + @Test + public void testCreateFunctionWithFileUrl() throws Exception { + String fnName = TEST_NAME + "-function"; + String inputTopicName = TEST_NAME + "-input-topic"; + String outputTopicName = TEST_NAME + "-output-topic"; + + final String url = "file:/usr/temp/myfile.jar"; + cmd.run(new String[] { + "create", + "--name", fnName, + "--inputs", inputTopicName, + "--output", outputTopicName, + "--jar", url, + "--tenant", "sample", + "--namespace", "ns1", + "--className", DummyFunction.class.getName(), + }); + + CreateFunction creater = cmd.getCreater(); + + assertEquals(fnName, creater.getFunctionName()); + assertEquals(inputTopicName, creater.getInputs()); + assertEquals(outputTopicName, creater.getOutput()); + verify(functions, times(1)).createFunctionWithUrl(any(FunctionDetails.class), anyString()); + } + + @Test + public void testCreateSink() throws Exception { + String fnName = TEST_NAME + "-function"; + String inputTopicName = TEST_NAME + "-input-topic"; + + + ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer(); + consoleOutputCapturer.start(); + final String url = "http://localhost:1234/test"; + cmdSinks.run(new String[] { + "create", + "--name", fnName, + "--inputs", inputTopicName, + "--jar", url, + "--tenant", "sample", + "--namespace", "ns1", + "--className", "DummySink" + }); + + CreateSink creater = cmdSinks.getCreateSink(); + + consoleOutputCapturer.stop(); + String output = consoleOutputCapturer.getStderr(); + + assertTrue(output.contains("Failed to download jar")); + assertEquals("DummySink", creater.className); + assertEquals(url, creater.jarFile); + } + + @Test + public void testCreateSource() throws Exception { + String fnName = TEST_NAME + "-function"; + + ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer(); + consoleOutputCapturer.start(); + + final String url = "http://localhost:1234/test"; + cmdSources.run(new String[] { + "create", + "--name", fnName, + "--jar", url, + "--tenant", "sample", + "--namespace", "ns1", + "--className", "DummySink" + }); + + CreateSource creater = cmdSources.getCreateSource(); + + consoleOutputCapturer.stop(); + String output = consoleOutputCapturer.getStderr(); + + assertTrue(output.contains("Failed to download jar")); + assertEquals("DummySink", creater.className); + assertEquals(url, creater.jarFile); + } + @Test public void testCreateFunctionWithTopicPatterns() throws Exception { String fnName = TEST_NAME + "-function"; @@ -505,7 +629,7 @@ private void testValidateFunctionsConfigs(String[] correctArgs, String[] incorre consoleOutputCapturer.stop(); String output = consoleOutputCapturer.getStderr(); - assertEquals(output.replace("\n", ""), errMessageCheck); + assertTrue(output.replace("\n", "").contains(errMessageCheck)); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 1031cbac97ac0..67b1377f76b8e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -26,6 +26,7 @@ import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.lang.reflect.Type; import java.net.MalformedURLException; @@ -47,11 +48,15 @@ import org.apache.bookkeeper.clients.config.StorageClientSettings; import org.apache.bookkeeper.clients.utils.NetUtils; import org.apache.commons.lang.StringUtils; +import static org.apache.commons.lang.StringUtils.isNotBlank; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.commons.lang.StringUtils.isBlank; import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function.FunctionDetails; @@ -66,6 +71,7 @@ import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.WindowConfig; import org.apache.pulsar.functions.utils.validation.ConfigValidation; +import org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassesValidator; import org.apache.pulsar.functions.windowing.WindowFunctionExecutor; import org.apache.pulsar.functions.windowing.WindowUtils; @@ -80,6 +86,9 @@ import com.google.gson.reflect.TypeToken; import static org.apache.pulsar.functions.utils.Utils.fileExists; +import static org.apache.pulsar.functions.utils.Utils.getSinkType; +import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -209,10 +218,7 @@ abstract class FunctionDetailsCommand extends BaseCommand { protected String functionName; @Parameter(names = "--className", description = "The function's class name") protected String className; - @Parameter( - names = "--jar", - description = "Path to the jar file for the function (if the function is written in Java)", - listConverter = StringConverter.class) + @Parameter(names = "--jar", description = "Path to the jar file for the function (if the function is written in Java). It also supports url-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.", listConverter = StringConverter.class) protected String jarFile; @Parameter( names = "--py", @@ -379,7 +385,7 @@ void processArguments() throws Exception { if (null != pyFile) { functionConfig.setPy(pyFile); } - + if (functionConfig.getJar() != null) { userCodeFile = functionConfig.getJar(); } else if (functionConfig.getPy() != null) { @@ -392,30 +398,58 @@ void processArguments() throws Exception { protected void validateFunctionConfigs(FunctionConfig functionConfig) { - if (functionConfig.getJar() != null && functionConfig.getPy() != null) { + if (isNotBlank(functionConfig.getJar()) && isNotBlank(functionConfig.getPy())) { throw new ParameterException("Either a Java jar or a Python file needs to" + " be specified for the function. Cannot specify both."); } - if (functionConfig.getJar() == null && functionConfig.getPy() == null) { + if (isBlank(functionConfig.getJar()) && isBlank(functionConfig.getPy())) { throw new ParameterException("Either a Java jar or a Python file needs to" + " be specified for the function. Please specify one."); } - if (!fileExists(userCodeFile)) { - throw new ParameterException("File " + userCodeFile + " does not exist"); + boolean isJarPathUrl = isNotBlank(functionConfig.getJar()) && Utils.isFunctionPackageUrlSupported(functionConfig.getJar()); + String jarFilePath = null; + if (isJarPathUrl) { + if (functionConfig.getJar().startsWith(Utils.HTTP)) { + // download jar file if url is http or file is downloadable + File tempPkgFile = null; + try { + tempPkgFile = File.createTempFile(functionConfig.getName(), "function"); + downloadFromHttpUrl(functionConfig.getJar(), new FileOutputStream(tempPkgFile)); + jarFilePath = tempPkgFile.getAbsolutePath(); + } catch (Exception e) { + if (tempPkgFile != null) { + tempPkgFile.deleteOnExit(); + } + throw new ParameterException("Failed to download jar from " + functionConfig.getJar() + + ", due to =" + e.getMessage()); + } + } + } else { + if (!fileExists(userCodeFile)) { + throw new ParameterException("File " + userCodeFile + " does not exist"); + } + jarFilePath = userCodeFile; } if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { - File file = new File(functionConfig.getJar()); - ClassLoader userJarLoader; - try { - userJarLoader = Reflections.loadJar(file); - } catch (MalformedURLException e) { - throw new ParameterException("Failed to load user jar " + file + " with error " + e.getMessage()); + + if (jarFilePath != null) { + File file = new File(jarFilePath); + ClassLoader userJarLoader; + try { + userJarLoader = Reflections.loadJar(file); + } catch (MalformedURLException e) { + throw new ParameterException( + "Failed to load user jar " + file + " with error " + e.getMessage()); + } + // make sure the function class loader is accessible thread-locally + Thread.currentThread().setContextClassLoader(userJarLoader); + + (new ImplementsClassesValidator(Function.class, java.util.function.Function.class)) + .validateField("className", functionConfig.getClassName()); } - // make sure the function class loader is accessible thread-locally - Thread.currentThread().setContextClassLoader(userJarLoader); } try { @@ -514,7 +548,6 @@ protected FunctionDetails convert(FunctionConfig functionConfig) Class[] typeArgs = null; if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { - // Assuming any external jars are already loaded typeArgs = Utils.getFunctionTypes(functionConfig); } @@ -683,7 +716,12 @@ void runCmd() throws Exception { class CreateFunction extends FunctionDetailsCommand { @Override void runCmd() throws Exception { - admin.functions().createFunction(convert(functionConfig), userCodeFile); + if (Utils.isFunctionPackageUrlSupported(jarFile)) { + admin.functions().createFunctionWithUrl(convert(functionConfig), jarFile); + } else { + admin.functions().createFunction(convert(functionConfig), userCodeFile); + } + print("Created successfully"); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index fa8ace468c52c..83eb7f577fd44 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -25,6 +25,9 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import lombok.Getter; + +import static org.apache.commons.lang3.StringUtils.isBlank; + import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.internal.FunctionsImpl; @@ -40,8 +43,11 @@ import org.apache.pulsar.functions.utils.SinkConfig; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.validation.ConfigValidation; +import org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassValidator; +import org.apache.pulsar.io.core.Sink; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.lang.reflect.Type; import java.net.MalformedURLException; @@ -55,6 +61,7 @@ import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; import static org.apache.pulsar.functions.utils.Utils.fileExists; import static org.apache.pulsar.functions.utils.Utils.getSinkType; +import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl; @Getter @Parameters(commandDescription = "Interface for managing Pulsar Sinks (Egress data from Pulsar)") @@ -136,7 +143,11 @@ void runCmd() throws Exception { class CreateSink extends SinkCommand { @Override void runCmd() throws Exception { - admin.functions().createFunction(createSinkConfig(sinkConfig), jarFile); + if (Utils.isFunctionPackageUrlSupported(jarFile)) { + admin.functions().createFunctionWithUrl(createSinkConfig(sinkConfig), jarFile); + } else { + admin.functions().createFunction(createSinkConfig(sinkConfig), jarFile); + } print("Created successfully"); } } @@ -170,12 +181,9 @@ abstract class SinkCommand extends BaseCommand { protected FunctionConfig.ProcessingGuarantees processingGuarantees; @Parameter(names = "--parallelism", description = "The sink's parallelism factor (i.e. the number of sink instances to run)") protected Integer parallelism; - @Parameter( - names = "--jar", - description = "Path to the jar file for the sink", - listConverter = StringConverter.class) + @Parameter(names = "--jar", description = "Path to the jar file for the sink. It also supports url-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.", listConverter = StringConverter.class) protected String jarFile; - + @Parameter(names = "--sinkConfigFile", description = "The path to a YAML config file specifying the " + "sink's configuration") protected String sinkConfigFile; @@ -244,7 +252,7 @@ void processArguments() throws Exception { if (null != jarFile) { sinkConfig.setJar(jarFile); } - + sinkConfig.setResources(new org.apache.pulsar.functions.utils.Resources(cpu, ram, disk)); if (null != sinkConfigString) { @@ -266,24 +274,54 @@ private void inferMissingArguments(SinkConfig sinkConfig) { } protected void validateSinkConfigs(SinkConfig sinkConfig) { - if (null == sinkConfig.getJar()) { + + if (isBlank(sinkConfig.getJar())) { throw new ParameterException("Sink jar not specfied"); } - - if (!fileExists(sinkConfig.getJar())) { - throw new ParameterException("Jar file " + sinkConfig.getJar() + " does not exist"); + + boolean isJarPathUrl = Utils.isFunctionPackageUrlSupported(sinkConfig.getJar()); + + String jarFilePath = null; + if (isJarPathUrl) { + // download jar file if url is http + if(sinkConfig.getJar().startsWith(Utils.HTTP)) { + File tempPkgFile = null; + try { + tempPkgFile = File.createTempFile(sinkConfig.getName(), "sink"); + downloadFromHttpUrl(sinkConfig.getJar(), new FileOutputStream(tempPkgFile)); + jarFilePath = tempPkgFile.getAbsolutePath(); + } catch(Exception e) { + if(tempPkgFile!=null ) { + tempPkgFile.deleteOnExit(); + } + throw new ParameterException("Failed to download jar from " + sinkConfig.getJar() + + ", due to =" + e.getMessage()); + } + } + } else { + jarFilePath = sinkConfig.getJar(); } + + // if jar file is present locally then load jar and validate SinkClass in it + if (jarFilePath != null) { + if (!fileExists(jarFilePath)) { + throw new ParameterException("Jar file " + jarFilePath + " does not exist"); + } - File file = new File(sinkConfig.getJar()); - ClassLoader userJarLoader; - try { - userJarLoader = Reflections.loadJar(file); - } catch (MalformedURLException e) { - throw new ParameterException("Failed to load user jar " + file + " with error " + e.getMessage()); - } - // make sure the function class loader is accessible thread-locally - Thread.currentThread().setContextClassLoader(userJarLoader); + File file = new File(jarFilePath); + ClassLoader userJarLoader; + try { + userJarLoader = Reflections.loadJar(file); + } catch (MalformedURLException e) { + throw new ParameterException("Failed to load user jar " + file + " with error " + e.getMessage()); + } + // make sure the function class loader is accessible thread-locally + Thread.currentThread().setContextClassLoader(userJarLoader); + // jar is already loaded, validate against Sink-Class name + (new ImplementsClassValidator(Sink.class)).validateField("className", sinkConfig.getClassName()); + } + try { // Need to load jar and set context class loader before calling ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name()); @@ -306,7 +344,8 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) { // check if configs are valid validateSinkConfigs(sinkConfig); - Class typeArg = getSinkType(sinkConfig.getClassName()); + String typeArg = sinkConfig.getJar().startsWith(Utils.FILE) ? null + : getSinkType(sinkConfig.getClassName()).getName(); FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); if (sinkConfig.getTenant() != null) { @@ -334,7 +373,9 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) { if (sinkConfig.getTopicsPattern() != null) { sourceSpecBuilder.setTopicsPattern(sinkConfig.getTopicsPattern()); } - sourceSpecBuilder.setTypeClassName(typeArg.getName()); + if (typeArg != null) { + sourceSpecBuilder.setTypeClassName(typeArg); + } functionDetailsBuilder.setAutoAck(true); functionDetailsBuilder.setSource(sourceSpecBuilder); @@ -344,7 +385,9 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) { if (sinkConfig.getConfigs() != null) { sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs())); } - sinkSpecBuilder.setTypeClassName(typeArg.getName()); + if (typeArg != null) { + sinkSpecBuilder.setTypeClassName(typeArg); + } functionDetailsBuilder.setSink(sinkSpecBuilder); if (sinkConfig.getResources() != null) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index c724b72f6c16d..32d9a6ca4c7f4 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -25,6 +25,8 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import lombok.Getter; + +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.internal.FunctionsImpl; @@ -39,18 +41,26 @@ import org.apache.pulsar.functions.utils.SourceConfig; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.validation.ConfigValidation; +import org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassValidator; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.Source; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.lang.reflect.Type; import java.net.MalformedURLException; import java.util.Map; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; import static org.apache.pulsar.functions.utils.Utils.fileExists; +import static org.apache.pulsar.functions.utils.Utils.getSinkType; import static org.apache.pulsar.functions.utils.Utils.getSourceType; +import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl; @Getter @Parameters(commandDescription = "Interface for managing Pulsar Source (Ingress data to Pulsar)") @@ -163,10 +173,7 @@ abstract class SourceCommand extends BaseCommand { protected String deserializationClassName; @Parameter(names = "--parallelism", description = "The source's parallelism factor (i.e. the number of source instances to run)") protected Integer parallelism; - @Parameter( - names = "--jar", - description = "Path to the jar file for the Source", - listConverter = StringConverter.class) + @Parameter(names = "--jar", description = "Path to the jar file for the Source. It also supports url-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.", listConverter = StringConverter.class) protected String jarFile; @Parameter(names = "--sourceConfigFile", description = "The path to a YAML config file specifying the " @@ -220,7 +227,7 @@ void processArguments() throws Exception { if (jarFile != null) { sourceConfig.setJar(jarFile); } - + sourceConfig.setResources(new org.apache.pulsar.functions.utils.Resources(cpu, ram, disk)); if (null != sourceConfigString) { @@ -242,26 +249,56 @@ private void inferMissingArguments(SourceConfig sourceConfig) { } protected void validateSourceConfigs(SourceConfig sourceConfig) { - if (null == sourceConfig.getJar()) { + if (StringUtils.isBlank(sourceConfig.getJar())) { throw new ParameterException("Source jar not specfied"); } - if (!fileExists(sourceConfig.getJar())) { - throw new ParameterException("Jar file " + sourceConfig.getJar() + " does not exist"); + boolean isJarPathUrl = Utils.isFunctionPackageUrlSupported(sourceConfig.getJar()); + + String jarFilePath = null; + if (isJarPathUrl) { + // download jar file if url is http + if(sourceConfig.getJar().startsWith(Utils.HTTP)) { + File tempPkgFile = null; + try { + tempPkgFile = File.createTempFile(sourceConfig.getName(), "source"); + downloadFromHttpUrl(sourceConfig.getJar(), new FileOutputStream(tempPkgFile)); + jarFilePath = tempPkgFile.getAbsolutePath(); + } catch(Exception e) { + if(tempPkgFile!=null ) { + tempPkgFile.deleteOnExit(); + } + throw new ParameterException("Failed to download jar from " + sourceConfig.getJar() + + ", due to =" + e.getMessage()); + } + } + } else { + jarFilePath = sourceConfig.getJar(); } + + + // if jar file is present locally then load jar and validate SinkClass in it + if (jarFilePath != null) { + if (!fileExists(jarFilePath)) { + throw new ParameterException("Jar file " + jarFilePath + " does not exist"); + } - File file = new File(jarFile); - ClassLoader userJarLoader; - try { - userJarLoader = Reflections.loadJar(file); - } catch (MalformedURLException e) { - throw new ParameterException("Failed to load user jar " + file + " with error " + e.getMessage()); + File file = new File(jarFilePath); + ClassLoader userJarLoader; + try { + userJarLoader = Reflections.loadJar(file); + } catch (MalformedURLException e) { + throw new ParameterException("Failed to load user jar " + file + " with error " + e.getMessage()); + } + // make sure the function class loader is accessible thread-locally + Thread.currentThread().setContextClassLoader(userJarLoader); + + // jar is already loaded, validate against Source-Class name + (new ImplementsClassValidator(Source.class)).validateField("className", sourceConfig.getClassName()); } - // make sure the function class loader is accessible thread-locally - Thread.currentThread().setContextClassLoader(userJarLoader); try { - // Need to load jar and set context class loader before calling + // Need to load jar and set context class loader before calling ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name()); } catch (Exception e) { throw new ParameterException(e.getMessage()); @@ -281,7 +318,8 @@ protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) { // check if source configs are valid validateSourceConfigs(sourceConfig); - Class typeArg = getSourceType(sourceConfig.getClassName()); + String typeArg = sourceConfig.getJar().startsWith(Utils.FILE) ? null + : getSourceType(sourceConfig.getClassName()).getName(); FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); if (sourceConfig.getTenant() != null) { @@ -308,7 +346,9 @@ protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) { if (sourceConfig.getConfigs() != null) { sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs())); } - sourceSpecBuilder.setTypeClassName(typeArg.getName()); + if (typeArg != null) { + sourceSpecBuilder.setTypeClassName(typeArg); + } functionDetailsBuilder.setSource(sourceSpecBuilder); // set up sink spec. @@ -318,7 +358,10 @@ protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) { sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName()); } sinkSpecBuilder.setTopic(sourceConfig.getTopicName()); - sinkSpecBuilder.setTypeClassName(typeArg.getName()); + + if (typeArg != null) { + sinkSpecBuilder.setTypeClassName(typeArg); + } functionDetailsBuilder.setSink(sinkSpecBuilder); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java index 097d62694c822..2de7d57ebb8dc 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java @@ -25,14 +25,12 @@ import lombok.ToString; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists; -import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidSinkConfig; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName; import org.apache.pulsar.functions.utils.validation.ValidatorImpls; -import org.apache.pulsar.io.core.Sink; import java.util.HashMap; import java.util.Map; @@ -51,7 +49,6 @@ public class SinkConfig { @NotNull private String name; @NotNull - @isImplementationOfClass(implementsClass = Sink.class) private String className; @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, valueValidatorClasses = { ValidatorImpls.SerdeValidator.class }) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java index 295f339183a2c..8e4eb17638c02 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java @@ -50,7 +50,6 @@ public class SourceConfig { @NotNull private String name; @NotNull - @isImplementationOfClass(implementsClass = Source.class) private String className; @NotNull @isValidTopicName diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index c5eba1f802f83..205f19d896bcb 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.functions.utils; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + import java.io.File; import java.io.IOException; import java.lang.reflect.Constructor; @@ -51,6 +53,9 @@ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class Utils { + public static String HTTP = "http"; + public static String FILE = "file"; + public static final long getSequenceId(MessageId messageId) { MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl) ? ((TopicMessageIdImpl) messageId).getInnerMessageId() @@ -207,4 +212,9 @@ public static Class getSinkType(String className) { public static boolean fileExists(String file) { return new File(file).exists(); } + + public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) { + return isNotBlank(functionPkgUrl) + && (functionPkgUrl.startsWith(Utils.HTTP) || functionPkgUrl.startsWith(Utils.FILE)); + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java index 360289ce34555..a63d56eee8f31 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java @@ -40,6 +40,7 @@ import java.util.HashSet; import java.util.Map; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.functions.utils.Utils.fileExists; import static org.apache.pulsar.functions.utils.Utils.getSinkType; import static org.apache.pulsar.functions.utils.Utils.getSourceType; @@ -218,6 +219,10 @@ public static class ImplementsClassesValidator extends Validator { public ImplementsClassesValidator(Map params) { this.classesImplements = (Class[]) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASSES); } + + public ImplementsClassesValidator(Class... classesImplements) { + this.classesImplements = classesImplements; + } @Override public void validateField(String name, Object o) { @@ -695,6 +700,11 @@ public static class SinkConfigValidator extends Validator { @Override public void validateField(String name, Object o) { SinkConfig sinkConfig = (SinkConfig) o; + // if function-pkg url is present eg: file://xyz.jar then admin-tool might not have access of the file at + // the same location so, need to rely on server side validation. + if (Utils.isFunctionPackageUrlSupported(sinkConfig.getJar())) { + return; + } Class typeArg = getSinkType(sinkConfig.getClassName()); ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); @@ -762,9 +772,14 @@ public void validateField(String name, Object o) { } new StringValidator().validateField(name, o); - if (!fileExists((String) o)) { - throw new IllegalArgumentException - (String.format("File %s specified in field '%s' does not exist", o, name)); + String path = (String) o; + + if(!Utils.isFunctionPackageUrlSupported(path)) { + // check file existence if path is not url and local path + if (!fileExists(path)) { + throw new IllegalArgumentException + (String.format("File %s specified in field '%s' does not exist", path, name)); + } } } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index a03828b6f0d2d..ef519f4dce52c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -18,31 +18,41 @@ */ package org.apache.pulsar.functions.worker; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.MoreFiles; -import com.google.common.io.RecursiveDeleteOption; +import static org.apache.pulsar.functions.utils.Utils.FILE; +import static org.apache.pulsar.functions.utils.Utils.HTTP; +import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; +import java.net.URL; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.UUID; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; -import lombok.*; -import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.runtime.RuntimeFactory; -import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.util.UUID; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.MoreFiles; +import com.google.common.io.RecursiveDeleteOption; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; @Data @Setter @@ -113,10 +123,11 @@ protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exc File pkgFile = null; String pkgLocation = functionMetaData.getPackageLocation().getPackagePath(); - boolean isPkgUrlProvided = Utils.isFunctionPackageUrlSupported(pkgLocation); + boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation); - if(isPkgUrlProvided && pkgLocation.startsWith(Utils.FILE)) { - pkgFile = new File(pkgLocation); + if(isPkgUrlProvided && pkgLocation.startsWith(FILE)) { + URL url = new URL(pkgLocation); + pkgFile = new File(url.toURI()); } else { File pkgDir = new File( workerConfig.getDownloadDirectory(), @@ -164,7 +175,7 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa } } String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath(); - boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(Utils.HTTP); + boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(HTTP); log.info("Function package file {} will be downloaded from {}", tempPkgFile, downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation()); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index 576257b3d6647..ec99b001e06e4 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -50,13 +50,11 @@ import org.apache.pulsar.functions.worker.dlog.DLOutputStream; import org.apache.zookeeper.KeeperException.Code; import org.apache.pulsar.functions.proto.Function; +import static org.apache.pulsar.functions.utils.Utils.FILE; @Slf4j public final class Utils { - public static String HTTP = "http"; - public static String FILE = "file"; - private Utils(){} public static Object getObject(byte[] byteArr) throws IOException, ClassNotFoundException { @@ -243,8 +241,4 @@ public static String getFullyQualifiedInstanceId(String tenant, String namespace return String.format("%s/%s/%s:%d", tenant, namespace, functionName, instanceId); } - public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) { - return isNotBlank(functionPkgUrl) - && (functionPkgUrl.startsWith(Utils.HTTP) || functionPkgUrl.startsWith(Utils.FILE)); - } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 8ac57663b7efb..9d475b24cfbec 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -22,14 +22,16 @@ import com.google.gson.Gson; + import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; +import java.net.URLClassLoader; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -48,6 +50,7 @@ import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; import lombok.extern.slf4j.Slf4j; +import net.jodah.typetools.TypeResolver; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -63,13 +66,20 @@ import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; -import org.apache.pulsar.functions.worker.FunctionActioner; +import static org.apache.pulsar.functions.utils.Reflections.createInstance; +import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders; +import static org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders.create; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.MembershipManager; import org.apache.pulsar.functions.worker.Utils; +import static org.apache.pulsar.functions.utils.Utils.HTTP; +import static org.apache.pulsar.functions.utils.Utils.FILE; +import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.request.RequestResult; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.Source; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; @@ -122,8 +132,8 @@ public Response registerFunction(final @PathParam("tenant") String tenant, // validate parameters try { if(isPkgUrlProvided) { - functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, - functionPkgUrl, functionDetailsJson); + functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl, + functionDetailsJson); }else { functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStream, fileDetail, functionDetailsJson); @@ -640,10 +650,10 @@ public Response downloadFunction(final @QueryParam("path") String path) { return Response.status(Status.OK).entity(new StreamingOutput() { @Override public void write(final OutputStream output) throws IOException { - if (path.startsWith(Utils.HTTP)) { + if (path.startsWith(HTTP)) { URL url = new URL(path); IOUtils.copy(url.openStream(), output); - } else if (path.startsWith(Utils.FILE)) { + } else if (path.startsWith(FILE)) { URL url = new URL(path); File file; try { @@ -710,14 +720,17 @@ private void validateDeregisterRequestParams(String tenant, } } - private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, + private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String functionName, String functionPkgUrl, String functionDetailsJson) throws IllegalArgumentException, IOException, URISyntaxException { - if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) { + if (!isFunctionPackageUrlSupported(functionPkgUrl)) { throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)"); } Utils.validateFileUrl(functionPkgUrl, workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory()); - return validateUpdateRequestParams(tenant, namespace, functionName, functionDetailsJson); + File jarWithFileUrl = functionPkgUrl.startsWith(FILE) ? (new File((new URL(functionPkgUrl)).toURI())) : null; + FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, + functionDetailsJson, jarWithFileUrl); + return functionDetails; } private FunctionDetails validateUpdateRequestParams(String tenant, @@ -729,13 +742,14 @@ private FunctionDetails validateUpdateRequestParams(String tenant, if (uploadedInputStream == null || fileDetail == null) { throw new IllegalArgumentException("Function Package is not provided"); } - return validateUpdateRequestParams(tenant, namespace, functionName, functionDetailsJson); + return validateUpdateRequestParams(tenant, namespace, functionName, functionDetailsJson, null); } private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, - String functionDetailsJson) throws IllegalArgumentException { + String functionDetailsJson, + File jarWithFileUrl) throws IllegalArgumentException { if (tenant == null) { throw new IllegalArgumentException("Tenant is not provided"); } @@ -752,6 +766,7 @@ private FunctionDetails validateUpdateRequestParams(String tenant, try { FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); org.apache.pulsar.functions.utils.Utils.mergeJson(functionDetailsJson, functionDetailsBuilder); + validateFunctionClassTypes(jarWithFileUrl, functionDetailsBuilder); FunctionDetails functionDetails = functionDetailsBuilder.build(); List missingFields = new LinkedList<>(); @@ -790,6 +805,85 @@ private FunctionDetails validateUpdateRequestParams(String tenant, } } + private void validateFunctionClassTypes(File jarFile, FunctionDetails.Builder functionDetailsBuilder) + throws MalformedURLException { + + // validate only if jar-file is provided + if(jarFile == null) { + return; + } + + if (StringUtils.isBlank(functionDetailsBuilder.getClassName())) { + throw new IllegalArgumentException("function class-name can't be empty"); + } + + URL[] urls = new URL[1]; + urls[0] = jarFile.toURI().toURL(); + URLClassLoader classLoader = create(urls, FunctionClassLoaders.class.getClassLoader()); + + // validate function class-type + Object functionObject = createInstance(functionDetailsBuilder.getClassName(), classLoader); + if (!(functionObject instanceof org.apache.pulsar.functions.api.Function) && !(functionObject instanceof java.util.function.Function)) { + throw new RuntimeException("User class must either be Function or java.util.Function"); + } + + if (functionDetailsBuilder.hasSource() && functionDetailsBuilder.getSource() != null + && StringUtils.isNotBlank(functionDetailsBuilder.getSource().getClassName())) { + try { + String sourceClassName = functionDetailsBuilder.getSource().getClassName(); + String argClassName = getTypeArg(sourceClassName, Source.class, classLoader).getName(); + functionDetailsBuilder.setSource(functionDetailsBuilder.getSourceBuilder() + .setTypeClassName(argClassName)); + + // if sink-class not present then set same arg as source + if (!functionDetailsBuilder.hasSink() + || StringUtils.isBlank(functionDetailsBuilder.getSink().getClassName())) { + functionDetailsBuilder + .setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName)); + } + + } catch (IllegalArgumentException ie) { + throw ie; + } catch (Exception e) { + log.error("Failed to validate source class", e); + throw new IllegalArgumentException("Failed to validate source class-name", e); + } + } + + if (functionDetailsBuilder.hasSink() && functionDetailsBuilder.getSink() != null + && StringUtils.isNotBlank(functionDetailsBuilder.getSink().getClassName())) { + try { + String sinkClassName = functionDetailsBuilder.getSink().getClassName(); + String argClassName = getTypeArg(sinkClassName, Sink.class, classLoader).getName(); + functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder() + .setTypeClassName(argClassName)); + + // if source-class not present then set same arg as sink + if (!functionDetailsBuilder.hasSource() + || StringUtils.isBlank(functionDetailsBuilder.getSource().getClassName())) { + functionDetailsBuilder + .setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName)); + } + + } catch (IllegalArgumentException ie) { + throw ie; + } catch (Exception e) { + log.error("Failed to validate sink class", e); + throw new IllegalArgumentException("Failed to validate sink class-name", e); + } + } + } + + private Class getTypeArg(String className, Class funClass, URLClassLoader classLoader) + throws ClassNotFoundException { + Class loadedClass = classLoader.loadClass(className); + if (!funClass.isAssignableFrom(loadedClass)) { + throw new IllegalArgumentException( + String.format("class %s is not type of %s", className, funClass.getName())); + } + return TypeResolver.resolveRawArgument(funClass, loadedClass); + } + private void validateTriggerRequestParams(String tenant, String namespace, String functionName, diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java index 1dfe0fdd9a40c..a1bcd4a4243f8 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java @@ -38,6 +38,7 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; import org.testng.annotations.Test; +import static org.apache.pulsar.functions.utils.Utils.FILE; /** * Unit test of {@link FunctionActioner}. @@ -113,7 +114,7 @@ public void testStartFunctionWithPkgUrl() throws Exception { // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call // RuntimeSpawner - String pkgPathLocation = Utils.FILE + ":/user/my-file.jar"; + String pkgPathLocation = FILE + ":/user/my-file.jar"; Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant") .setNamespace("test-namespace").setName("func-1")) diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 16161e023d973..49dede52655c5 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -65,6 +65,8 @@ import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; +import org.apache.pulsar.io.core.RecordContext; +import org.apache.pulsar.io.core.Sink; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -95,6 +97,22 @@ public String process(String input, Context context) throws Exception { return input; } } + + public static final class TestSink implements Sink { + + @Override + public void close() throws Exception { + } + + @Override + public void open(Map config) throws Exception { + } + + @Override + public void write(RecordContext inputRecordContext, byte[] value) throws Exception { + } + } + private static final String tenant = "test-tenant"; private static final String namespace = "test-namespace"; @@ -964,4 +982,54 @@ public void testDownloadFunctionFile() throws Exception { pkgFile.delete(); } } + + @Test + public void testRegisterFunctionFileUrlWithValidSinkClass() throws IOException { + Configurator.setRootLevel(Level.DEBUG); + + String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + String filePackageUrl = "file://" + fileLocation; + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false); + + RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered"); + CompletableFuture requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + + SinkSpec sinkSpec = SinkSpec.newBuilder().setClassName(TestSink.class.getName()).setTopic(outputTopic) + .setSerDeClassName(outputSerdeClassName).build(); + FunctionDetails functionDetails = FunctionDetails + .newBuilder().setTenant(tenant).setNamespace(namespace).setName(function).setSink(sinkSpec) + .setClassName(className).setParallelism(parallelism).setSource(SourceSpec.newBuilder() + .setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName)) + .build(); + Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl, + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } + + @Test + public void testRegisterFunctionFileUrlWithInValidSinkClass() throws IOException { + Configurator.setRootLevel(Level.DEBUG); + + String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + String filePackageUrl = "file://" + fileLocation; + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false); + + RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered"); + CompletableFuture requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + + SinkSpec sinkSpec = SinkSpec.newBuilder().setClassName(className).setTopic(outputTopic) + .setSerDeClassName(outputSerdeClassName).build(); + FunctionDetails functionDetails = FunctionDetails + .newBuilder().setTenant(tenant).setNamespace(namespace).setName(function).setSink(sinkSpec) + .setClassName(className).setParallelism(parallelism).setSource(SourceSpec.newBuilder() + .setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName)) + .build(); + Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl, + org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); + + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + } }