diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 1fb7650b1e8fb3..329fbc65104074 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -255,6 +255,10 @@ public static List getGoInstanceCmd(InstanceConfig instanceConfig, return args; } + public static boolean isJavaVersion9OrMore() { + return !System.getProperty("java.version").startsWith("1."); + } + public static List getCmd(InstanceConfig instanceConfig, String instanceFile, @@ -320,6 +324,11 @@ public static List getCmd(InstanceConfig instanceConfig, args.add("-Dio.netty.tryReflectionSetAccessible=true"); + // Needed for netty.DnsResolverUtil on JDK9+ + if (isJavaVersion9OrMore()) { + args.add("--add-opens java.base/sun.net=ALL-UNNAMED"); + } + if (instanceConfig.getAdditionalJavaRuntimeArguments() != null) { args.addAll(instanceConfig.getAdditionalJavaRuntimeArguments()); } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java index 301e38ee820867..92d8bcfcfa2018 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.functions.runtime.process; import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH; +import static org.mockito.ArgumentMatchers.any; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -43,6 +44,7 @@ import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.ConsumerSpec; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.runtime.RuntimeUtils; import org.apache.pulsar.functions.runtime.thread.ThreadRuntime; import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; @@ -50,6 +52,7 @@ import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.FunctionsManager; import org.apache.pulsar.functions.worker.WorkerConfig; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -283,14 +286,18 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir) throws Exce } private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webServiceUrl) throws Exception { - ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l); - List args = container.getProcessArgs(); + List args; + try (MockedStatic runtimeUtils = Mockito.mockStatic(RuntimeUtils.class, Mockito.CALLS_REAL_METHODS)) { + runtimeUtils.when(RuntimeUtils::isJavaVersion9OrMore).thenReturn(true); + ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l); + args = container.getProcessArgs(); + } String classpath = javaInstanceJarFile; String extraDepsEnv; int portArg; int metricsPortArg; - int totalArgCount = 42; + int totalArgCount = 43; if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) { totalArgCount += 3; } @@ -298,13 +305,13 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS assertEquals(args.size(), totalArgCount); extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir; classpath = classpath + ":" + depsDir + "/*"; - portArg = 25; - metricsPortArg = 27; + portArg = 26; + metricsPortArg = 28; } else { assertEquals(args.size(), totalArgCount-1); extraDepsEnv = ""; - portArg = 24; - metricsPortArg = 26; + portArg = 25; + metricsPortArg = 27; } if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) { portArg += 3; @@ -321,6 +328,7 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails()) + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId() + " -Dio.netty.tryReflectionSetAccessible=true" + + " --add-opens java.base/sun.net=ALL-UNNAMED" + " org.apache.pulsar.functions.instance.JavaInstanceMain" + " --jar " + userJarFile + " --instance_id " + config.getInstanceId() + " --function_id " + config.getFunctionId()