From 07556c2f00a479ac44bed5bdeccc0e33f129b185 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Tue, 2 Feb 2021 12:59:58 -0800 Subject: [PATCH] Allow memory limit to be set for the pulsar client used in the ThreadRuntime in Pulsar Functions --- conf/functions_worker.yml | 6 + .../functions/instance/InstanceUtils.java | 11 ++ .../runtime/thread/ThreadRuntimeFactory.java | 68 +++++--- .../thread/ThreadRuntimeFactoryConfig.java | 23 +++ .../thread/ThreadRuntimeFactoryTest.java | 152 ++++++++++++++++++ .../worker/SchedulerManagerTest.java | 24 ++- 6 files changed, 247 insertions(+), 37 deletions(-) create mode 100644 pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 4aec9e5422628..c2fd3b8db6dc6 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -133,6 +133,12 @@ functionRuntimeFactoryConfigs: #functionRuntimeFactoryConfigs: # # thread group name # threadGroupName: "Thread Function Container Group" +## Set the pulsar client memory limit +#pulsarClientMemoryLimit +# # the max memory in bytes the pulsar client can use +# absoluteValue: +# # the max memory the pulsar client can use as a percentage of max direct memory set for JVM +# percentOfMaxDirectMemory: #### Kubernetes Runtime #### # Pulsar function are deployed to Kubernetes diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java index 9c16bba05fe27..a7f88ce4f10c4 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.common.functions.AuthenticationConfig; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.proto.Function; @@ -45,6 +46,7 @@ import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; @Slf4j @UtilityClass @@ -156,6 +158,12 @@ public static Map getProperties(Function.FunctionDetails.Compone } public static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig) throws PulsarClientException { + return createPulsarClient(pulsarServiceUrl, authConfig, Optional.empty()); + } + + public static PulsarClient createPulsarClient(String pulsarServiceUrl, + AuthenticationConfig authConfig, + Optional memoryLimit) throws PulsarClientException { ClientBuilder clientBuilder = null; if (isNotBlank(pulsarServiceUrl)) { clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl); @@ -170,6 +178,9 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) { clientBuilder.enableTlsHostnameVerification(authConfig.isTlsHostnameVerificationEnable()); clientBuilder.tlsTrustCertsFilePath(authConfig.getTlsTrustCertsFilePath()); } + if (memoryLimit.isPresent()) { + clientBuilder.memoryLimit(memoryLimit.get(), SizeUnit.BYTES); + } clientBuilder.ioThreads(Runtime.getRuntime().availableProcessors()); return clientBuilder.build(); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java index b34d7a85d6240..f5e3736c01405 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java @@ -19,7 +19,8 @@ package org.apache.pulsar.functions.runtime.thread; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import io.netty.util.internal.PlatformDependent; import io.prometheus.client.CollectorRegistry; import lombok.Getter; import lombok.NoArgsConstructor; @@ -73,24 +74,15 @@ public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, Str CollectorRegistry collectorRegistry, String narExtractionDirectory, ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled, String pulsarWebServiceUrl) throws Exception { - initialize(threadGroupName, InstanceUtils.createPulsarClient(pulsarServiceUrl, authConfig), - storageServiceUrl, null, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader, - exposePulsarAdminClientEnabled ? InstanceUtils.createPulsarAdminClient(pulsarWebServiceUrl, authConfig) : null); + initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, authConfig, + storageServiceUrl, null, secretsProvider, collectorRegistry, narExtractionDirectory, + rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl); } - @VisibleForTesting - public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl, - SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, - String narExtractionDirectory, ClassLoader rootClassLoader, PulsarAdmin pulsarAdmin) { - - initialize(threadGroupName, pulsarClient, storageServiceUrl, - null, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader, pulsarAdmin); - } - - private void initialize(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl, + private void initialize(String threadGroupName, Optional memoryLimit, String pulsarServiceUrl, AuthenticationConfig authConfig, String storageServiceUrl, SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider, - CollectorRegistry collectorRegistry, String narExtractionDirectory, ClassLoader rootClassLoader, - PulsarAdmin pulsarAdmin) { + CollectorRegistry collectorRegistry, String narExtractionDirectory, + ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled, String pulsarWebServiceUrl) throws PulsarClientException { if (rootClassLoader == null) { rootClassLoader = Thread.currentThread().getContextClassLoader(); } @@ -100,13 +92,45 @@ private void initialize(String threadGroupName, PulsarClient pulsarClient, Strin this.defaultSecretsProvider = secretsProvider; this.fnCache = new FunctionCacheManagerImpl(rootClassLoader); this.threadGroup = new ThreadGroup(threadGroupName); - this.pulsarClient = pulsarClient; - this.pulsarAdmin = pulsarAdmin; + this.pulsarAdmin = exposePulsarAdminClientEnabled ? InstanceUtils.createPulsarAdminClient(pulsarWebServiceUrl, authConfig) : null; + this.pulsarClient = InstanceUtils.createPulsarClient(pulsarServiceUrl, authConfig, calculateClientMemoryLimit(memoryLimit)); this.storageServiceUrl = storageServiceUrl; this.collectorRegistry = collectorRegistry; this.narExtractionDirectory = narExtractionDirectory; } + private Optional calculateClientMemoryLimit(Optional memoryLimit) { + if (memoryLimit.isPresent()) { + + Long absolute = memoryLimit.get().getAbsoluteValue(); + Double percentOfDirectMem = memoryLimit.get().getPercentOfMaxDirectMemory(); + if (absolute != null) { + Preconditions.checkArgument(absolute > 0, "Absolute memory limit for Pulsar client has to be positive"); + } + if (percentOfDirectMem != null) { + Preconditions.checkArgument(percentOfDirectMem > 0 && percentOfDirectMem <= 100, "Percent of max direct memory limit for Pulsar client must be between 0 and 100"); + } + + if (absolute != null && percentOfDirectMem != null) { + return Optional.of(Math.min(absolute, getBytesPercentDirectMem(percentOfDirectMem))); + } + + if (absolute != null) { + return Optional.of(absolute); + } + + if (percentOfDirectMem != null) { + return Optional.of(getBytesPercentDirectMem(percentOfDirectMem)); + } + } + return Optional.empty(); + } + + private long getBytesPercentDirectMem(double percent) { + return (long) (PlatformDependent.maxDirectMemory() * (percent / 100)); + } + + @Override public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig, SecretsProviderConfigurator secretsProviderConfigurator, @@ -115,13 +139,11 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic ThreadRuntimeFactoryConfig factoryConfig = RuntimeUtils.getRuntimeFunctionConfig( workerConfig.getFunctionRuntimeFactoryConfigs(), ThreadRuntimeFactoryConfig.class); - initialize(factoryConfig.getThreadGroupName(), - InstanceUtils.createPulsarClient(workerConfig.getPulsarServiceUrl(), authenticationConfig), + initialize(factoryConfig.getThreadGroupName(), Optional.ofNullable(factoryConfig.getPulsarClientMemoryLimit()), + workerConfig.getPulsarServiceUrl(), authenticationConfig, workerConfig.getStateStorageServiceUrl(), secretsProviderConfigurator, null, null, workerConfig.getNarExtractionDirectory(), null, - workerConfig.isExposeAdminClientEnabled() ? - InstanceUtils.createPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(), - authenticationConfig) : null); + workerConfig.isExposeAdminClientEnabled(), workerConfig.getPulsarWebServiceUrl()); } @Override diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryConfig.java index 795a83cae3a55..3b96bdbb8bfac 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryConfig.java @@ -25,8 +25,31 @@ @Data @Accessors(chain = true) public class ThreadRuntimeFactoryConfig { + + @Data + @Accessors(chain = true) + /** + * Memory limit set for the pulsar client used by all instances + * If `absoluteValue` and `percentOfMaxDirectMemory` are both set, then the min of the two will be used. + */ + public static class MemoryLimit { + @FieldContext( + doc = "The max memory in bytes the pulsar client can use" + ) + Long absoluteValue; + @FieldContext( + doc = "The max memory the pulsar client can use as a percentage of max direct memory set for JVM" + ) + Double percentOfMaxDirectMemory; + } + @FieldContext( doc = "The name of thread group running function threads" ) protected String threadGroupName; + + @FieldContext( + doc = "Memory limit set for the pulsar client used by all instances" + ) + protected MemoryLimit pulsarClientMemoryLimit; } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java new file mode 100644 index 0000000000000..d0e25964d93f3 --- /dev/null +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.functions.runtime.thread; + +import io.netty.util.internal.PlatformDependent; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SizeUnit; +import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.common.util.RestException; +import org.apache.pulsar.functions.instance.InstanceUtils; +import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; +import org.apache.pulsar.functions.worker.WorkerConfig; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.testng.IObjectFactory; +import org.testng.annotations.ObjectFactory; +import org.testng.annotations.Test; + +import java.util.Map; +import java.util.Optional; + +@PrepareForTest({InstanceUtils.class, PulsarClient.class, PlatformDependent.class}) +@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*"}) +@Slf4j +public class ThreadRuntimeFactoryTest { + + @ObjectFactory + public IObjectFactory getObjectFactory() { + return new org.powermock.modules.testng.PowerMockObjectFactory(); + } + + @Test + public void testMemoryLimitPercent() throws Exception { + + ClientBuilder clientBuilder = testMemoryLimit(null, 50.0); + + Mockito.verify(clientBuilder, Mockito.times(1)).memoryLimit(Mockito.eq((long) (1024 * 0.5)), Mockito.eq(SizeUnit.BYTES)); + } + + @Test + public void testMemoryLimitAbsolute() throws Exception { + + ClientBuilder clientBuilder = testMemoryLimit(512L, null); + + Mockito.verify(clientBuilder, Mockito.times(1)).memoryLimit(Mockito.eq(512L), Mockito.eq(SizeUnit.BYTES)); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testMemoryLimitAbsoluteNegative() throws Exception { + testMemoryLimit(-512L, null); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testMemoryLimitPercentNegative() throws Exception { + testMemoryLimit(null, -50.0); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testMemoryLimitPercentOver100() throws Exception { + testMemoryLimit(null, 120.0); + } + + @Test + public void testMemoryLimitNotSet() throws Exception { + + ClientBuilder clientBuilder = testMemoryLimit(null, null); + + Mockito.verify(clientBuilder, Mockito.times(0)).memoryLimit(Mockito.anyLong(), Mockito.any()); + } + + @Test + public void testMemoryLimitBothSet() throws Exception { + + ClientBuilder clientBuilder = testMemoryLimit(512L, 100.0); + + Mockito.verify(clientBuilder, Mockito.times(1)).memoryLimit(Mockito.eq(512L), Mockito.eq(SizeUnit.BYTES)); + + clientBuilder = testMemoryLimit(2048L, 100.0); + + Mockito.verify(clientBuilder, Mockito.times(1)).memoryLimit(Mockito.eq(1024L), Mockito.eq(SizeUnit.BYTES)); + + clientBuilder = testMemoryLimit(512L, 25.0); + + Mockito.verify(clientBuilder, Mockito.times(1)).memoryLimit(Mockito.eq(256L), Mockito.eq(SizeUnit.BYTES)); + + clientBuilder = testMemoryLimit(512L, 75.0); + + Mockito.verify(clientBuilder, Mockito.times(1)).memoryLimit(Mockito.eq(512L), Mockito.eq(SizeUnit.BYTES)); + } + + + private ClientBuilder testMemoryLimit(Long absolute, Double percent) throws Exception { + PowerMockito.mockStatic(PulsarClient.class); + PowerMockito.mockStatic(PlatformDependent.class); + + PowerMockito.when(PlatformDependent.maxDirectMemory()).thenReturn(1024L); + + ClientBuilder clientBuilder = Mockito.mock(ClientBuilder.class); + PowerMockito.when(PulsarClient.builder()).thenReturn(clientBuilder); + PowerMockito.when(PulsarClient.builder().serviceUrl(Mockito.anyString())).thenReturn(clientBuilder); + + + ThreadRuntimeFactoryConfig threadRuntimeFactoryConfig = new ThreadRuntimeFactoryConfig(); + threadRuntimeFactoryConfig.setThreadGroupName("foo"); + ThreadRuntimeFactoryConfig.MemoryLimit memoryLimit = new ThreadRuntimeFactoryConfig.MemoryLimit(); + if (percent != null) { + memoryLimit.setPercentOfMaxDirectMemory(percent); + } + + if (absolute != null) { + memoryLimit.setAbsoluteValue(absolute); + } + threadRuntimeFactoryConfig.setPulsarClientMemoryLimit(memoryLimit); + + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName()); + workerConfig.setFunctionRuntimeFactoryConfigs(ObjectMapperFactory.getThreadLocal().convertValue(threadRuntimeFactoryConfig, Map.class)); + workerConfig.setPulsarServiceUrl("pulsar://broker.pulsar:6650"); + + ThreadRuntimeFactory threadRuntimeFactory = new ThreadRuntimeFactory(); + + threadRuntimeFactory.initialize( + workerConfig, + Mockito.mock(AuthenticationConfig.class), + Mockito.mock(SecretsProviderConfigurator.class), + Optional.empty(), Optional.empty()); + + return clientBuilder; + } +} \ No newline at end of file diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 299d1b6e9d8a1..2a6278e7a68ae 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -153,8 +153,8 @@ public void testSchedule() throws Exception { .build(); functionMetaDataList.add(function1); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null, null); + + ThreadRuntimeFactory factory = mock(ThreadRuntimeFactory.class); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -201,7 +201,7 @@ public void testNothingNewToSchedule() throws Exception { functionMetaDataList.add(function1); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null, null); + ThreadRuntimeFactory factory = mock(ThreadRuntimeFactory.class); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -249,7 +249,7 @@ public void testAddingFunctions() throws Exception { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null, null); + ThreadRuntimeFactory factory = mock(ThreadRuntimeFactory.class); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -311,7 +311,7 @@ public void testDeletingFunctions() throws Exception { functionMetaDataList.add(function1); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null, null); + ThreadRuntimeFactory factory = mock(ThreadRuntimeFactory.class); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -377,8 +377,7 @@ public void testScalingUp() throws Exception { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider - (), new CollectorRegistry(), null, null, null); + ThreadRuntimeFactory factory = mock(ThreadRuntimeFactory.class); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -489,7 +488,7 @@ public void testScalingDown() throws Exception { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null, null); + ThreadRuntimeFactory factory = mock(ThreadRuntimeFactory.class); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -638,8 +637,7 @@ public void testHeartbeatFunction() throws Exception { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), - new CollectorRegistry(), null, null, null); + ThreadRuntimeFactory factory = mock(ThreadRuntimeFactory.class); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); Map> currentAssignments = new HashMap<>(); @@ -693,8 +691,7 @@ public void testUpdate() throws Exception { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), - new CollectorRegistry(), null, null, null); + ThreadRuntimeFactory factory = mock(ThreadRuntimeFactory.class); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -839,8 +836,7 @@ public void testAssignmentWorkerDoesNotExist() throws InterruptedException, NoSu functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), - new CollectorRegistry(), null, null, null); + ThreadRuntimeFactory factory = mock(ThreadRuntimeFactory.class); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments