Skip to content

Commit

Permalink
Expose metrics via http port in function instance (#2930)
Browse files Browse the repository at this point in the history
* fix bugs in python metrics

* instance expose metrics

* remove commented out code

* fix unit tests

* remove commented out code

* fixing test

* fix python instance test

* removing old code

* fix bug

* refactoring java metrics

* refactoring python metrics

* cleaning up code

* removing unneccessary code

* improving metrics format

* fixing test

* fix bugs and revising format

* fix bug

* fix for python3

* change user defined metric to summary

* renaming labels

* change back python
  • Loading branch information
jerrypeng committed Nov 8, 2018
1 parent b708b49 commit 900e747
Show file tree
Hide file tree
Showing 25 changed files with 417 additions and 270 deletions.
Expand Up @@ -18,25 +18,12 @@
*/ */
package org.apache.pulsar.functions.instance; package org.apache.pulsar.functions.instance;


import static com.google.common.base.Preconditions.checkState;

import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken; import com.google.gson.reflect.TypeToken;

import io.prometheus.client.CollectorRegistry;
import java.nio.ByteBuffer; import io.prometheus.client.Summary;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;

import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClient;
Expand All @@ -55,9 +42,24 @@
import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger; import org.slf4j.Logger;


import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkState;

/** /**
* This class implements the Context interface exposed to the user. * This class implements the Context interface exposed to the user.
*/ */

class ContextImpl implements Context, SinkContext, SourceContext { class ContextImpl implements Context, SinkContext, SourceContext {
private InstanceConfig config; private InstanceConfig config;
private Logger logger; private Logger logger;
Expand Down Expand Up @@ -92,7 +94,6 @@ public void update(double value) {
} }
} }


private ConcurrentMap<String, AccumulatedMetricDatum> currentAccumulatedMetrics;
private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics; private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics;


private Map<String, Producer<?>> publishProducers; private Map<String, Producer<?>> publishProducers;
Expand All @@ -110,11 +111,21 @@ public void update(double value) {
private StateContextImpl stateContext; private StateContextImpl stateContext;
private Map<String, Object> userConfigs; private Map<String, Object> userConfigs;


Map<String, String[]> userMetricsLabels = new HashMap<>();
private final String[] metricsLabels;
private final Summary userMetricsSummary;

private final static String[] userMetricsLabelNames;
static {
// add label to indicate user metric
userMetricsLabelNames = Arrays.copyOf(FunctionStats.metricsLabelNames, FunctionStats.metricsLabelNames.length + 1);
userMetricsLabelNames[FunctionStats.metricsLabelNames.length] = "metric";
}

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics, public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
SecretsProvider secretsProvider) { SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels) {
this.config = config; this.config = config;
this.logger = logger; this.logger = logger;
this.currentAccumulatedMetrics = new ConcurrentHashMap<>();
this.accumulatedMetrics = new ConcurrentHashMap<>(); this.accumulatedMetrics = new ConcurrentHashMap<>();
this.publishProducers = new HashMap<>(); this.publishProducers = new HashMap<>();
this.inputTopics = inputTopics; this.inputTopics = inputTopics;
Expand All @@ -138,6 +149,17 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, Li
} else { } else {
secretsMap = new HashMap<>(); secretsMap = new HashMap<>();
} }

this.metricsLabels = metricsLabels;
this.userMetricsSummary = Summary.build()
.name("pulsar_function_user_metric")
.help("Pulsar Function user defined metric.")
.labelNames(userMetricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.register(collectorRegistry);
} }


public void setCurrentMessageContext(Record<?> record) { public void setCurrentMessageContext(Record<?> record) {
Expand Down Expand Up @@ -320,8 +342,16 @@ public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O>


@Override @Override
public void recordMetric(String metricName, double value) { public void recordMetric(String metricName, double value) {
currentAccumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum()); userMetricsLabels.computeIfAbsent(metricName,
currentAccumulatedMetrics.get(metricName).update(value); s -> {
String[] userMetricLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
userMetricLabels[userMetricLabels.length - 1] = metricName;
return userMetricLabels;
});

userMetricsSummary.labels(userMetricsLabels.get(metricName)).observe(value);
accumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum());
accumulatedMetrics.get(metricName).update(value);
} }


public MetricsData getAndResetMetrics() { public MetricsData getAndResetMetrics() {
Expand All @@ -331,9 +361,8 @@ public MetricsData getAndResetMetrics() {
} }


public void resetMetrics() { public void resetMetrics() {
userMetricsSummary.clear();
this.accumulatedMetrics.clear(); this.accumulatedMetrics.clear();
this.accumulatedMetrics.putAll(currentAccumulatedMetrics);
this.currentAccumulatedMetrics.clear();
} }


public MetricsData getMetrics() { public MetricsData getMetrics() {
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.EvictingQueue; import com.google.common.collect.EvictingQueue;
import io.prometheus.client.CollectorRegistry; import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter; import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary; import io.prometheus.client.Summary;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
Expand All @@ -35,7 +36,16 @@
@Setter @Setter
public class FunctionStats { public class FunctionStats {


private static final String[] metricsLabelNames = {"tenant", "namespace", "name", "instance_id"}; static final String[] metricsLabelNames = {"tenant", "namespace", "function", "instance_id", "cluster"};

/** Declare metric names **/
static final String PULSAR_FUNCTION_PROCESSED_TOTAL = "pulsar_function_processed_total";
static final String PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL = "pulsar_function_processed_successfully_total";
static final String PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL = "pulsar_function_system_exceptions_total";
static final String PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL = "pulsar_function_user_exceptions_total";
static final String PULSAR_FUNCTION_PROCESS_LATENCY_MS = "pulsar_function_process_latency_ms";
static final String PULSAR_FUNCTION_LAST_INVOCATION = "pulsar_function_last_invocation";
static final String PULSAR_FUNCTION_RECEIVED_TOTAL = "pulsar_function_received_total";


/** Declare Prometheus stats **/ /** Declare Prometheus stats **/


Expand All @@ -49,54 +59,67 @@ public class FunctionStats {


final Summary statProcessLatency; final Summary statProcessLatency;


final Gauge statlastInvocation;

final Counter statTotalRecordsRecieved;

CollectorRegistry functionCollectorRegistry; CollectorRegistry functionCollectorRegistry;


@Getter @Getter
private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions = EvictingQueue.create(10); private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions = EvictingQueue.create(10);
@Getter @Getter
private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10); private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10);


@Getter public FunctionStats(CollectorRegistry collectorRegistry) {
@Setter
private long lastInvocationTime = 0;

public FunctionStats() {
// Declare function local collector registry so that it will not clash with other function instances' // Declare function local collector registry so that it will not clash with other function instances'
// metrics collection especially in threaded mode // metrics collection especially in threaded mode
functionCollectorRegistry = new CollectorRegistry(); functionCollectorRegistry = new CollectorRegistry();


statTotalProcessed = Counter.build() statTotalProcessed = Counter.build()
.name("__function_total_processed__") .name(PULSAR_FUNCTION_PROCESSED_TOTAL)
.help("Total number of messages processed.") .help("Total number of messages processed.")
.labelNames(metricsLabelNames) .labelNames(metricsLabelNames)
.register(functionCollectorRegistry); .register(collectorRegistry);


statTotalProcessedSuccessfully = Counter.build() statTotalProcessedSuccessfully = Counter.build()
.name("__function_total_successfully_processed__") .name(PULSAR_FUNCTION_PROCESSED_SUCCESSFULLY_TOTAL)
.help("Total number of messages processed successfully.") .help("Total number of messages processed successfully.")
.labelNames(metricsLabelNames) .labelNames(metricsLabelNames)
.register(functionCollectorRegistry); .register(collectorRegistry);


statTotalSysExceptions = Counter.build() statTotalSysExceptions = Counter.build()
.name("__function_total_system_exceptions__") .name(PULSAR_FUNCTION_SYSTEM_EXCEPTIONS_TOTAL)
.help("Total number of system exceptions.") .help("Total number of system exceptions.")
.labelNames(metricsLabelNames) .labelNames(metricsLabelNames)
.register(functionCollectorRegistry); .register(collectorRegistry);


statTotalUserExceptions = Counter.build() statTotalUserExceptions = Counter.build()
.name("__function_total_user_exceptions__") .name(PULSAR_FUNCTION_USER_EXCEPTIONS_TOTAL)
.help("Total number of user exceptions.") .help("Total number of user exceptions.")
.labelNames(metricsLabelNames) .labelNames(metricsLabelNames)
.register(functionCollectorRegistry); .register(collectorRegistry);


statProcessLatency = Summary.build() statProcessLatency = Summary.build()
.name("__function_process_latency_ms__").help("Process latency in milliseconds.") .name(PULSAR_FUNCTION_PROCESS_LATENCY_MS)
.help("Process latency in milliseconds.")
.quantile(0.5, 0.01) .quantile(0.5, 0.01)
.quantile(0.9, 0.01) .quantile(0.9, 0.01)
.quantile(0.99, 0.01) .quantile(0.99, 0.01)
.quantile(0.999, 0.01) .quantile(0.999, 0.01)
.labelNames(metricsLabelNames) .labelNames(metricsLabelNames)
.register(functionCollectorRegistry); .register(collectorRegistry);

statlastInvocation = Gauge.build()
.name(PULSAR_FUNCTION_LAST_INVOCATION)
.help("The timestamp of the last invocation of the function")
.labelNames(metricsLabelNames)
.register(collectorRegistry);

statTotalRecordsRecieved = Counter.build()
.name(PULSAR_FUNCTION_RECEIVED_TOTAL)
.help("Total number of messages received from source.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
} }


public void addUserException(Exception ex) { public void addUserException(Exception ex) {
Expand All @@ -120,8 +143,9 @@ public void reset() {
statTotalSysExceptions.clear(); statTotalSysExceptions.clear();
statTotalUserExceptions.clear(); statTotalUserExceptions.clear();
statProcessLatency.clear(); statProcessLatency.clear();
statlastInvocation.clear();
statTotalRecordsRecieved.clear();
latestUserExceptions.clear(); latestUserExceptions.clear();
latestSystemExceptions.clear(); latestSystemExceptions.clear();
lastInvocationTime = 0;
} }
} }
Expand Up @@ -41,6 +41,7 @@ public class InstanceConfig {
private FunctionDetails functionDetails; private FunctionDetails functionDetails;
private int maxBufferedTuples; private int maxBufferedTuples;
private int port; private int port;
private String clusterName;


/** /**
* Get the string representation of {@link #getInstanceId()}. * Get the string representation of {@link #getInstanceId()}.
Expand Down

0 comments on commit 900e747

Please sign in to comment.