Skip to content

Commit

Permalink
Enable Function subscription naming workaround when activated
Browse files Browse the repository at this point in the history
The Pulsar Function will use the function name as the subscription name when this workaround
is activated. This workaround is activated when -DdsPatchFunctionNameAsSubscriptionName=true
system property or dsPatchFunctionNameAsSubscriptionName=true environment variable is passed
to the Functions Worker (or to the Pulsar Broker when there isn't a separate Functions Worker).
  • Loading branch information
lhotari committed Jun 23, 2021
1 parent 653aeca commit b2e95a6
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,26 @@ public class FunctionConfigUtils {
static final Integer MAX_PENDING_ASYNC_REQUESTS_DEFAULT = Integer.valueOf(1000);
static final Boolean FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT = Boolean.TRUE;

/*
Enable Function subscription naming workaround when activated
The Pulsar Function will use the function name as the subscription name when this workaround
is activated. This workaround is activated when -DdsPatchFunctionNameAsSubscriptionName=true
system property or dsPatchFunctionNameAsSubscriptionName=true environment variable is passed
to the Functions Worker (or to the Pulsar Broker when there isn't a separate Functions Worker).
*/
private static final String FLAG_NAME_DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME =
"dsPatchFunctionNameAsSubscriptionName";
static final boolean DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME = Boolean.valueOf(System.getProperty(
FLAG_NAME_DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME, Boolean.FALSE.toString()))
|| Boolean.valueOf(System.getenv().getOrDefault(FLAG_NAME_DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME,
Boolean.FALSE.toString()));

private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.create();

public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
throws IllegalArgumentException {

boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);

Class<?>[] typeArgs = null;
Expand Down Expand Up @@ -155,6 +170,8 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
// Set subscription name
if (isNotBlank(functionConfig.getSubName())) {
sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName());
} else if (DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME) {
sourceSpecBuilder.setSubscriptionName(functionConfig.getName());
}

// Set subscription position
Expand Down Expand Up @@ -348,6 +365,8 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails)
functionConfig.setInputSpecs(consumerConfigMap);
if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
functionConfig.setSubName(functionDetails.getSource().getSubscriptionName());
} else if (DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME) {
functionConfig.setSubName(functionDetails.getName());
}
functionConfig.setRetainOrdering(functionDetails.getRetainOrdering());
functionConfig.setRetainKeyOrdering(functionDetails.getRetainKeyOrdering());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
}
if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
} else if (FunctionConfigUtils.DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME) {
sourceSpecBuilder.setSubscriptionName(sinkConfig.getName());
}

Function.SubscriptionType subType = ((sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering())
Expand Down Expand Up @@ -274,6 +276,8 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
sinkConfig.setInputSpecs(consumerConfigMap);
if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
sinkConfig.setSourceSubscriptionName(functionDetails.getSource().getSubscriptionName());
} else if (FunctionConfigUtils.DS_PATCH_FUNCTION_NAME_AS_SUBSCRIPTION_NAME) {
sinkConfig.setSourceSubscriptionName(functionDetails.getName());
}
if (functionDetails.getSource().getSubscriptionType() == Function.SubscriptionType.FAILOVER) {
sinkConfig.setRetainOrdering(true);
Expand Down

0 comments on commit b2e95a6

Please sign in to comment.