Skip to content

Commit

Permalink
Fix Pulsar Function localrun with multiple instances and metrics serv…
Browse files Browse the repository at this point in the history
…er is enabled
  • Loading branch information
Jerry Peng committed Apr 13, 2021
1 parent 56bad04 commit 01f8410
Show file tree
Hide file tree
Showing 16 changed files with 461 additions and 196 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public static Map<String, Metric> parseMetrics(String metrics) {
return;
}
Matcher matcher = pattern.matcher(line);
log.info("line: {}", line);
checkArgument(matcher.matches());
String name = matcher.group(1);
Metric m = new Metric();
Expand All @@ -93,7 +92,6 @@ public static Map<String, Metric> parseMetrics(String metrics) {
parsed.put(name, m);
});

log.info("parsed metrics: {}", parsed);
return parsed;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
import org.apache.pulsar.functions.instance.stats.SinkStatsManager;
import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
Expand Down Expand Up @@ -115,7 +116,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
private final Function.FunctionDetails.ComponentType componentType;

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
StateManager stateManager, PulsarAdmin pulsarAdmin) {
this.config = config;
Expand Down Expand Up @@ -172,15 +173,17 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
default:
throw new RuntimeException("Unknown component type: " + componentType);
}
this.userMetricsSummary = Summary.build()
.name(prefix + ComponentStatsManager.USER_METRIC_PREFIX)
.help("User defined metric.")
.labelNames(userMetricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.register(collectorRegistry);
this.userMetricsSummary = collectorRegistry.registerIfNotExist(
prefix + ComponentStatsManager.USER_METRIC_PREFIX,
Summary.build()
.name(prefix + ComponentStatsManager.USER_METRIC_PREFIX)
.help("User defined metric.")
.labelNames(userMetricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.create());
this.componentType = componentType;
this.stateManager = stateManager;
this.defaultStateStore = (DefaultStateStore) stateManager.getStore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pulsar.functions.instance.state.StateStoreContextImpl;
import org.apache.pulsar.functions.instance.state.StateStoreProvider;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
Expand Down Expand Up @@ -106,7 +107,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {

private final SecretsProvider secretsProvider;

private CollectorRegistry collectorRegistry;
private FunctionCollectorRegistry collectorRegistry;
private final String[] metricsLabels;

private InstanceCache instanceCache;
Expand All @@ -130,14 +131,13 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry,
FunctionCollectorRegistry collectorRegistry,
ClassLoader functionClassLoader) {
this.instanceConfig = instanceConfig;
this.client = (PulsarClientImpl) pulsarClient;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
this.collectorRegistry = collectorRegistry;
this.functionClassLoader = functionClassLoader;
this.metricsLabels = new String[]{
instanceConfig.getFunctionDetails().getTenant(),
Expand Down Expand Up @@ -171,7 +171,7 @@ synchronized private void setup() throws Exception {
this.instanceCache = InstanceCache.getInstanceCache();

if (this.collectorRegistry == null) {
this.collectorRegistry = new CollectorRegistry();
this.collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation();
}
this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels,
this.instanceCache.getScheduledExecutorService(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {

protected ScheduledFuture<?> scheduledFuture;

protected final CollectorRegistry collectorRegistry;
protected final FunctionCollectorRegistry collectorRegistry;

protected final EvictingQueue EMPTY_QUEUE = EvictingQueue.create(0);

Expand All @@ -53,7 +53,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {
exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
}

public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry,
public static ComponentStatsManager getStatsManager(FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService,
Function.FunctionDetails.ComponentType componentType) {
Expand All @@ -69,7 +69,7 @@ public static ComponentStatsManager getStatsManager(CollectorRegistry collectorR
}
}

public ComponentStatsManager(CollectorRegistry collectorRegistry,
public ComponentStatsManager(FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.instance.stats;

import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;

public abstract class FunctionCollectorRegistry extends CollectorRegistry {
public static FunctionCollectorRegistry getDefaultImplementation() {
return new FunctionCollectorRegistryImpl();
}

public abstract <T extends Collector> T registerIfNotExist(String metricName, T collector);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.instance.stats;

import io.prometheus.client.Collector;

import java.util.HashMap;
import java.util.Map;

public class FunctionCollectorRegistryImpl extends FunctionCollectorRegistry {

private final Map<String, Collector> namesToCollectors = new HashMap<String, Collector>();

public Collector registerIfNotExist(String metricName, Collector collector) {
synchronized (this) {
Collector existingCollector = namesToCollectors.get(metricName);
if (existingCollector == null) {
namesToCollectors.put(metricName, collector);
super.register(collector);
return collector;
}
return existingCollector;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,118 +119,148 @@ public class FunctionStatsManager extends ComponentStatsManager{

private final RateLimiter sysExceptionRateLimiter;

public FunctionStatsManager(CollectorRegistry collectorRegistry,
public FunctionStatsManager(FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService) {
super(collectorRegistry, metricsLabels, scheduledExecutorService);

statTotalProcessedSuccessfully = Counter.build()
statTotalProcessedSuccessfully = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL)
.help("Total number of messages processed successfully.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalProcessedSuccessfully = statTotalProcessedSuccessfully.labels(metricsLabels);

statTotalSysExceptions = Counter.build()
statTotalSysExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL ,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
.help("Total number of system exceptions.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);

statTotalUserExceptions = Counter.build()
statTotalUserExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL)
.help("Total number of user exceptions.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalUserExceptions = statTotalUserExceptions.labels(metricsLabels);

statProcessLatency = Summary.build()
statProcessLatency = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS,
Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
.help("Process latency in milliseconds.")
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statProcessLatency = statProcessLatency.labels(metricsLabels);

statlastInvocation = Gauge.build()
statlastInvocation = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION,
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION)
.help("The timestamp of the last invocation of the function.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statlastInvocation = statlastInvocation.labels(metricsLabels);

statTotalRecordsReceived = Counter.build()
statTotalRecordsReceived = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL)
.help("Total number of messages received from source.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels);

statTotalProcessedSuccessfully1min = Counter.build()
statTotalProcessedSuccessfully1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min)
.help("Total number of messages processed successfully in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalProcessedSuccessfully1min = statTotalProcessedSuccessfully1min.labels(metricsLabels);

statTotalSysExceptions1min = Counter.build()
statTotalSysExceptions1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min)
.help("Total number of system exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels);

statTotalUserExceptions1min = Counter.build()
statTotalUserExceptions1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min)
.help("Total number of user exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalUserExceptions1min = statTotalUserExceptions1min.labels(metricsLabels);

statProcessLatency1min = Summary.build()
statProcessLatency1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min,
Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
.help("Process latency in milliseconds in the last 1 minute.")
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);

statTotalRecordsReceived1min = Counter.build()
statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min)
.help("Total number of messages received from source in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels);

userExceptions = Gauge.build()
userExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + "user_exception",
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from user code.")
.register(collectorRegistry);
sysExceptions = Gauge.build()
.create());
sysExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + "system_exception",
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "system_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from system code.")
.register(collectorRegistry);
.create());

sourceExceptions = Gauge.build()
sourceExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + "source_exception",
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "source_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from source.")
.register(collectorRegistry);
.create());

sinkExceptions = Gauge.build()
sinkExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception",
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from sink.")
.register(collectorRegistry);
.create());

userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
Expand Down
Loading

0 comments on commit 01f8410

Please sign in to comment.