Skip to content

Commit

Permalink
Support running function worker along with broker (#1329)
Browse files Browse the repository at this point in the history
* Move pulsar functions dependency version to root pom and remove duplicated license headers

This addresses some comments in pulsar functions PR #1314

* shade worker

* Fix broken master

* Upgrade the bookkeeper storage client dependency to the official bookkeeper version

This removes the temp dependency in `pulsar-functions-instance`

* set `protobuf2.version` in pulsar-common

* provide a shaded worker

* include worker dependency at broker

* Embeded function worker at broker

* rename 'function worker' to 'functions worker'

* add "--no-functions-worker" for pulsar-client-cpp tests
  • Loading branch information
sijie authored and merlimat committed Mar 5, 2018
1 parent f38fce7 commit b701925
Show file tree
Hide file tree
Showing 41 changed files with 1,633 additions and 634 deletions.
59 changes: 57 additions & 2 deletions bin/pulsar
Expand Up @@ -31,6 +31,14 @@ DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf
DEFAULT_WEBSOCKET_CONF=$PULSAR_HOME/conf/websocket.conf
DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml

# functions related variables
FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
DEFAULT_WORKER_CONF=$PULSAR_HOME/conf/functions_worker.yml
DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}

if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]
then
. "$PULSAR_HOME/conf/pulsar_env.sh"
Expand Down Expand Up @@ -63,6 +71,34 @@ elif [ -e "$BUILT_JAR" ]; then
PULSAR_JAR=$BUILT_JAR
fi

#
# find the instance locations for pulsar-functions
#

# find the java instance location
if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
# didn't find a released jar, then search the built jar
BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime/target/java-instance.jar"
if [ -z "${BUILT_JAVA_INSTANCE_JAR}" ]; then
echo "\nCouldn't find pulsar-functions java instance jar.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
fi
JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
fi

# find the python instance location
if [ ! -f "${PY_INSTANCE_FILE}" ]; then
# didn't find a released python instance, then search the built python instance
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py"
if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then
echo "\nCouldn't find pulsar-functions python instance.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
fi
PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
fi

pulsar_help() {
cat <<EOF
Usage: pulsar <command>
Expand All @@ -74,6 +110,7 @@ where command is one of:
discovery Run a discovery server
proxy Run a pulsar proxy
websocket Run a web socket proxy server
functions-worker Run a functions worker server
standalone Run a broker server with local bookies and local zookeeper
compact-topic Run compaction against a topic
Expand All @@ -93,6 +130,7 @@ Environment variables:
PULSAR_DISCOVERY_CONF Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF)
PULSAR_WEBSOCKET_CONF Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF)
PULSAR_PROXY_CONF Configuration file for Pulsar proxy (default: $DEFAULT_PROXY_CONF)
PULSAR_WORKER_CONF Configuration file for functions worker (default: $DEFAULT_WORKER_CONF)
PULSAR_STANDALONE_CONF Configuration file for standalone (default: $DEFAULT_STANDALONE_CONF)
PULSAR_EXTRA_OPTS Extra options to be passed to the jvm
PULSAR_EXTRA_CLASSPATH Add extra paths to the pulsar classpath
Expand All @@ -115,7 +153,7 @@ add_maven_deps_to_classpath() {
f="${PULSAR_HOME}/all/target/classpath.txt"
if [ ! -f "${f}" ]
then
${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -Dmdep.outputFile="${f}" &> /dev/null
${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
fi
PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
}
Expand All @@ -136,6 +174,10 @@ fi
COMMAND=$1
shift

if [ -z "$PULSAR_WORKER_CONF" ]; then
PULSAR_WORKER_CONF=$DEFAULT_WORKER_CONF
fi

if [ -z "$PULSAR_BROKER_CONF" ]; then
PULSAR_BROKER_CONF=$DEFAULT_BROKER_CONF
fi
Expand Down Expand Up @@ -186,12 +228,22 @@ OPTS="-cp $PULSAR_CLASSPATH $OPTS"
OPTS="$OPTS $PULSAR_EXTRA_OPTS"

# log directory & file
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"Console"}
PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}

#Configure log configuration system properties
OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"

# Functions related logging
OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
# instance
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"

#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"
Expand All @@ -218,6 +270,9 @@ elif [ $COMMAND == "proxy" ]; then
elif [ $COMMAND == "websocket" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-websocket.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.websocket.service.WebSocketServiceStarter $PULSAR_WEBSOCKET_CONF $@
elif [ $COMMAND == "functions-worker" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-functions-worker.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.functions.worker.FunctionWorkerStarter -c $PULSAR_WORKER_CONF $@
elif [ $COMMAND == "standalone" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-standalone.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
Expand Down
15 changes: 15 additions & 0 deletions buildtools/pom.xml
Expand Up @@ -40,6 +40,21 @@
<artifactId>testng</artifactId>
<version>6.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.10.0</version>
</dependency>
</dependencies>

<build>
Expand Down
6 changes: 6 additions & 0 deletions conf/broker.conf
Expand Up @@ -440,3 +440,9 @@ webSocketConnectionsPerBroker=8

# Enable topic level metrics
exposeTopicLevelMetricsInPrometheus=true

### --- Functions --- ###

# Enable Functions Worker Service in Broker
functionsWorkerEnabled=false

46 changes: 46 additions & 0 deletions conf/functions_worker.yml
@@ -0,0 +1,46 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

workerId: standalone
workerHostname: localhost
workerPort: 6750
functionMetadataTopicName: metadata
functionMetadataSnapshotsTopicPath: snapshots
clusterCoordinationTopicName: coordinate
pulsarFunctionsNamespace: sample/standalone/functions
pulsarServiceUrl: pulsar://localhost:6650
pulsarWebServiceUrl: http://localhost:8080
numFunctionPackageReplicas: 1
downloadDirectory: /tmp/pulsar_functions
metricsConfig:
metricsSinkClassName: org.apache.pulsar.functions.metrics.sink.PrometheusSink
metricsCollectionInterval: 30
metricsSinkConfig:
path: /metrics
port: 9099
#threadContainerFactory:
# threadGroupName: "Thread Function Container Group"
processContainerFactory:
logDirectory:

schedulerClassName: "org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler"
functionAssignmentTopicName: "assignments"
failureCheckFreqMs: 30000
rescheduleTimeoutMs: 60000
initialBrokerReconnectMaxRetries: 60
118 changes: 109 additions & 9 deletions conf/log4j2.yaml
Expand Up @@ -23,6 +23,21 @@ Configuration:
monitorInterval: 30
name: pulsar

Properties:
Property:
- name: "pulsar.log.dir"
value: "logs"
- name: "pulsar.log.file"
value: "pulsar.log"
- name: "pulsar.log.appender"
value: "RoutingAppender"
- name: "pulsar.log.level"
value: "info"
- name: "pulsar.routing.appender.default"
value: "Console"
- name: "bk.log.level"
value: "info"

# Example: logger-filter script
Scripts:
ScriptFile:
Expand All @@ -32,19 +47,19 @@ Configuration:
charset: UTF-8

Appenders:
# Console

# Console
Console:
name: Console
target: SYSTEM_OUT
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
# Rolling file appender configuration

# Rolling file appender configuration
RollingFile:
name: RollingFile
fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}"
filePattern: "/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
immediateFlush: false
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Expand All @@ -54,7 +69,7 @@ Configuration:
modulate: true
SizeBasedTriggeringPolicy:
size: 1 GB
# Trigger every day at midnight that also scan
# Trigger every day at midnight that also scan
# roll-over strategy that deletes older file
CronTriggeringPolicy:
schedule: "0 0 0 * * ?"
Expand All @@ -67,16 +82,101 @@ Configuration:
glob: "*/${sys:pulsar.log.file}*log.gz"
IfLastModified:
age: 30d


# Rolling file appender configuration for bk
RollingRandomAccessFile:
name: BkRollingFile
fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk"
filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz"
immediateFlush: true
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: 1 GB
# Trigger every day at midnight that also scan
# roll-over strategy that deletes older file
CronTriggeringPolicy:
schedule: "0 0 0 * * ?"
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.log.file}.bk*log.gz"
IfLastModified:
age: 30d

# Routing
Routing:
name: RoutingAppender
Routes:
pattern: "$${ctx:function}"
Route:
-
Routing:
name: InstanceRoutingAppender
Routes:
pattern: "$${ctx:instance}"
Route:
-
RollingFile:
name: "Rolling-${ctx:function}"
fileName : "${sys:pulsar.log.dir}/functions/${ctx:function}/function.log"
filePattern : "${sys:pulsar.log.dir}/functions/${ctx:function}-%d{MM-dd-yyyy}-%i.log.gz"
PatternLayout:
Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance: %X{instance}] %logger{1} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: "20MB"
# Trigger every day at midnight that also scan
# roll-over strategy that deletes older file
CronTriggeringPolicy:
schedule: "0 0 0 * * ?"
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.log.file}*log.gz"
IfLastModified:
age: 30d
- ref: "${sys:pulsar.routing.appender.default}"
key: "${ctx:function}"
- ref: "${sys:pulsar.routing.appender.default}"
key: "${ctx:function}"

Loggers:

Logger:
name: org.apache.bookkeeper
level: "${sys:bk.log.level}"
additivity: false
AppenderRef:
- ref: BkRollingFile

Logger:
name: org.apache.distributedlog
level: "${sys:bk.log.level}"
additivity: false
AppenderRef:
- ref: BkRollingFile

# Default root logger configuration
Root:
level: info
additivity: false
AppenderRef:
ref: "${sys:pulsar.log.appender}"
- ref: "${sys:pulsar.log.appender}"
level: "${sys:pulsar.log.level}"

# Logger to inject filter script
# Logger:
Expand All @@ -90,4 +190,4 @@ Configuration:
# onMisMatch: DENY
# ScriptRef:
# ref: filter.js


15 changes: 0 additions & 15 deletions managed-ledger/pom.xml
Expand Up @@ -36,21 +36,6 @@
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server-shaded</artifactId>
<version>${bookkeeper.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down

0 comments on commit b701925

Please sign in to comment.