From b2e95a6457278943eca47193595d112cd680348b Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 24 Jun 2021 01:18:37 +0300 Subject: [PATCH] Enable Function subscription naming workaround when activated The Pulsar Function will use the function name as the subscription name when this workaround is activated. This workaround is activated when -DdsPatchFunctionNameAsSubscriptionName=true system property or dsPatchFunctionNameAsSubscriptionName=true environment variable is passed to the Functions Worker (or to the Pulsar Broker when there isn't a separate Functions Worker). --- .../functions/utils/FunctionConfigUtils.java | 21 ++++++++++++++++++- .../functions/utils/SinkConfigUtils.java | 4 ++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 6b9b7628f9332..93a8d80d5eec0 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -54,11 +54,26 @@ public class FunctionConfigUtils { static final Integer MAX_PENDING_ASYNC_REQUESTS_DEFAULT = Integer.valueOf(1000); static final Boolean FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT = Boolean.TRUE; + /* + Enable Function subscription naming workaround when activated + + The Pulsar Function will use the function name as the subscription name when this workaround + is activated. This workaround is activated when -DdsPatchFunctionNameAsSubscriptionName=true + system property or dsPatchFunctionNameAsSubscriptionName=true environment variable is passed + to the Functions Worker (or to the Pulsar Broker when there isn't a separate Functions Worker). + */ + private static final String FLAG_NAME_DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME = + "dsPatchFunctionNameAsSubscriptionName"; + static final boolean DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME = Boolean.valueOf(System.getProperty( + FLAG_NAME_DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME, Boolean.FALSE.toString())) + || Boolean.valueOf(System.getenv().getOrDefault(FLAG_NAME_DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME, + Boolean.FALSE.toString())); + private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.create(); public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader) throws IllegalArgumentException { - + boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN); Class[] typeArgs = null; @@ -155,6 +170,8 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader // Set subscription name if (isNotBlank(functionConfig.getSubName())) { sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName()); + } else if (DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME) { + sourceSpecBuilder.setSubscriptionName(functionConfig.getName()); } // Set subscription position @@ -348,6 +365,8 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails) functionConfig.setInputSpecs(consumerConfigMap); if (!isEmpty(functionDetails.getSource().getSubscriptionName())) { functionConfig.setSubName(functionDetails.getSource().getSubscriptionName()); + } else if (DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME) { + functionConfig.setSubName(functionDetails.getName()); } functionConfig.setRetainOrdering(functionDetails.getRetainOrdering()); functionConfig.setRetainKeyOrdering(functionDetails.getRetainKeyOrdering()); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index e046537fb0218..270efe3f550f2 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -158,6 +158,8 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail } if (isNotBlank(sinkConfig.getSourceSubscriptionName())) { sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName()); + } else if (FunctionConfigUtils.DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME) { + sourceSpecBuilder.setSubscriptionName(sinkConfig.getName()); } Function.SubscriptionType subType = ((sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering()) @@ -274,6 +276,8 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) { sinkConfig.setInputSpecs(consumerConfigMap); if (!isEmpty(functionDetails.getSource().getSubscriptionName())) { sinkConfig.setSourceSubscriptionName(functionDetails.getSource().getSubscriptionName()); + } else if (FunctionConfigUtils.DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME) { + sinkConfig.setSourceSubscriptionName(functionDetails.getName()); } if (functionDetails.getSource().getSubscriptionType() == Function.SubscriptionType.FAILOVER) { sinkConfig.setRetainOrdering(true);