Skip to content

Commit

Permalink
prometheus metrics for functions served via brokers or function insta…
Browse files Browse the repository at this point in the history
…nces should match (#3066)

* prometheus metrics for functions served via brokers or instances themselves should match

* add additional testing
  • Loading branch information
jerrypeng committed Nov 27, 2018
1 parent 7719b8e commit 8d24102
Show file tree
Hide file tree
Showing 10 changed files with 294 additions and 285 deletions.

Large diffs are not rendered by default.

Expand Up @@ -23,11 +23,14 @@
import io.prometheus.client.Counter; import io.prometheus.client.Counter;
import io.prometheus.client.Gauge; import io.prometheus.client.Gauge;
import io.prometheus.client.Summary; import io.prometheus.client.Summary;
import io.prometheus.client.exporter.common.TextFormat;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication;


import java.io.IOException;
import java.io.StringWriter;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -89,13 +92,17 @@ public class FunctionStatsManager implements AutoCloseable {


private ScheduledFuture<?> scheduledFuture; private ScheduledFuture<?> scheduledFuture;


private final CollectorRegistry collectorRegistry;

@Getter @Getter
private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions = EvictingQueue.create(10); private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions = EvictingQueue.create(10);
@Getter @Getter
private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10); private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10);


public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService scheduledExecutorService) { public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService scheduledExecutorService) {


this.collectorRegistry = collectorRegistry;

this.metricsLabels = metricsLabels; this.metricsLabels = metricsLabels;


statTotalProcessedSuccessfully = Counter.build() statTotalProcessedSuccessfully = Counter.build()
Expand Down Expand Up @@ -326,6 +333,14 @@ public void reset() {
latestSystemExceptions.clear(); latestSystemExceptions.clear();
} }


public String getStatsAsString() throws IOException {
StringWriter outputWriter = new StringWriter();

TextFormat.write004(outputWriter, collectorRegistry.metricFamilySamples());

return outputWriter.toString();
}

@Override @Override
public void close() { public void close() {
scheduledFuture.cancel(false); scheduledFuture.cancel(false);
Expand Down
Expand Up @@ -24,7 +24,6 @@
import com.google.gson.reflect.TypeToken; import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry; import io.prometheus.client.CollectorRegistry;
import java.util.concurrent.TimeUnit;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
Expand All @@ -38,11 +37,8 @@
import org.apache.bookkeeper.clients.exceptions.InternalServerException; import org.apache.bookkeeper.clients.exceptions.InternalServerException;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException; import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException; import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.common.util.Backoff.Jitter; import org.apache.bookkeeper.common.util.Backoff.Jitter;
import org.apache.bookkeeper.common.util.Backoff.Jitter.Type; import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
import org.apache.bookkeeper.common.util.Backoff.Policy;
import org.apache.bookkeeper.common.util.Retries;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.StorageType; import org.apache.bookkeeper.stream.proto.StorageType;
import org.apache.bookkeeper.stream.proto.StreamConfiguration; import org.apache.bookkeeper.stream.proto.StreamConfiguration;
Expand All @@ -69,7 +65,6 @@
import org.apache.pulsar.functions.sink.PulsarSinkDisable; import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig; import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.StateUtils; import org.apache.pulsar.functions.utils.StateUtils;
Expand All @@ -86,6 +81,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;


import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
Expand Down Expand Up @@ -119,6 +115,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private Throwable deathException; private Throwable deathException;


// function stats // function stats
@Getter
private FunctionStatsManager stats; private FunctionStatsManager stats;


private Record<?> currentRecord; private Record<?> currentRecord;
Expand Down
Expand Up @@ -47,6 +47,8 @@
import io.kubernetes.client.models.V1StatefulSet; import io.kubernetes.client.models.V1StatefulSet;
import io.kubernetes.client.models.V1StatefulSetSpec; import io.kubernetes.client.models.V1StatefulSetSpec;
import io.kubernetes.client.models.V1Toleration; import io.kubernetes.client.models.V1Toleration;

import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
Expand Down Expand Up @@ -299,6 +301,11 @@ public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
return retval; return retval;
} }


@Override
public String getPrometheusMetrics() throws IOException {
return RuntimeUtils.getPrometheusMetrics(METRICS_PORT);
}

@Override @Override
public boolean isAlive() { public boolean isAlive() {
return running; return running;
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.Utils;


import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.List; import java.util.List;
import java.util.TimerTask; import java.util.TimerTask;
Expand All @@ -58,6 +59,7 @@ class ProcessRuntime implements Runtime {
@Getter @Getter
private List<String> processArgs; private List<String> processArgs;
private int instancePort; private int instancePort;
private int metricsPort;
@Getter @Getter
private Throwable deathException; private Throwable deathException;
private ManagedChannel channel; private ManagedChannel channel;
Expand All @@ -81,6 +83,7 @@ class ProcessRuntime implements Runtime {
Long expectedHealthCheckInterval) throws Exception { Long expectedHealthCheckInterval) throws Exception {
this.instanceConfig = instanceConfig; this.instanceConfig = instanceConfig;
this.instancePort = instanceConfig.getPort(); this.instancePort = instanceConfig.getPort();
this.metricsPort = Utils.findAvailablePort();
this.expectedHealthCheckInterval = expectedHealthCheckInterval; this.expectedHealthCheckInterval = expectedHealthCheckInterval;
this.secretsProviderConfigurator = secretsProviderConfigurator; this.secretsProviderConfigurator = secretsProviderConfigurator;
String logConfigFile = null; String logConfigFile = null;
Expand Down Expand Up @@ -119,7 +122,7 @@ class ProcessRuntime implements Runtime {
false, false,
null, null,
null, null,
Utils.findAvailablePort()); this.metricsPort);
} }


/** /**
Expand Down Expand Up @@ -268,6 +271,11 @@ public void onSuccess(InstanceCommunication.MetricsData t) {
return retval; return retval;
} }


@Override
public String getPrometheusMetrics() throws IOException {
return RuntimeUtils.getPrometheusMetrics(metricsPort);
}

public CompletableFuture<InstanceCommunication.HealthCheckResult> healthCheck() { public CompletableFuture<InstanceCommunication.HealthCheckResult> healthCheck() {
CompletableFuture<InstanceCommunication.HealthCheckResult> retval = new CompletableFuture<>(); CompletableFuture<InstanceCommunication.HealthCheckResult> retval = new CompletableFuture<>();
if (stub == null) { if (stub == null) {
Expand Down
Expand Up @@ -21,6 +21,7 @@


import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication;


import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


/** /**
Expand All @@ -46,4 +47,5 @@ public interface Runtime {


CompletableFuture<InstanceCommunication.MetricsData> getMetrics(); CompletableFuture<InstanceCommunication.MetricsData> getMetrics();


String getPrometheusMetrics() throws IOException;
} }
Expand Up @@ -20,6 +20,13 @@
package org.apache.pulsar.functions.runtime; package org.apache.pulsar.functions.runtime;


import com.google.protobuf.util.JsonFormat; import com.google.protobuf.util.JsonFormat;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URL;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -186,4 +193,18 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) {
args.add(instanceConfig.getClusterName()); args.add(instanceConfig.getClusterName());
return args; return args;
} }

public static String getPrometheusMetrics(int metricsPort) throws IOException{
StringBuilder result = new StringBuilder();
URL url = new URL(String.format("http://%s:%s", InetAddress.getLocalHost().getHostAddress(), metricsPort));
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
String line;
while ((line = rd.readLine()) != null) {
result.append(line + System.lineSeparator());
}
rd.close();
return result.toString();
}
} }
Expand Up @@ -19,6 +19,7 @@


package org.apache.pulsar.functions.runtime; package org.apache.pulsar.functions.runtime;


import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


import io.prometheus.client.CollectorRegistry; import io.prometheus.client.CollectorRegistry;
Expand Down Expand Up @@ -156,6 +157,11 @@ public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
return CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics()); return CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics());
} }


@Override
public String getPrometheusMetrics() throws IOException {
return javaInstanceRunnable.getStats().getStatsAsString();
}

@Override @Override
public CompletableFuture<Void> resetMetrics() { public CompletableFuture<Void> resetMetrics() {
javaInstanceRunnable.resetMetrics(); javaInstanceRunnable.resetMetrics();
Expand Down
Expand Up @@ -18,17 +18,15 @@
*/ */
package org.apache.pulsar.functions.worker; package org.apache.pulsar.functions.worker;


import org.apache.pulsar.functions.instance.FunctionStatsManager; import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory; import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException;


/** /**
* A class to generate stats for pulsar functions running on this broker * A class to generate stats for pulsar functions running on this broker
Expand Down Expand Up @@ -57,31 +55,10 @@ public static void generate(WorkerService workerService, String cluster, SimpleT
Runtime functionRuntime = functionRuntimeSpawner.getRuntime(); Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
if (functionRuntime != null) { if (functionRuntime != null) {
try { try {
InstanceCommunication.MetricsData metrics = functionRuntime.getMetrics().get();

String tenant = functionRuntimeInfo.getFunctionInstance()
.getFunctionMetaData().getFunctionDetails().getTenant();
String namespace = functionRuntimeInfo.getFunctionInstance()
.getFunctionMetaData().getFunctionDetails().getNamespace();
String name = functionRuntimeInfo.getFunctionInstance()
.getFunctionMetaData().getFunctionDetails().getName();
int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
String qualifiedNamespace = String.format("%s/%s", tenant, namespace);

metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.PROCESS_LATENCY_MS, instanceId, metrics.getAvgProcessLatency());
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.LAST_INVOCATION, instanceId, metrics.getLastInvocation());
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.PROCESSED_SUCCESSFULLY_TOTAL, instanceId, metrics.getProcessedSuccessfullyTotal());
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.RECEIVED_TOTAL, instanceId, metrics.getReceivedTotal());
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.SYSTEM_EXCEPTIONS_TOTAL, instanceId, metrics.getSystemExceptionsTotal());
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + FunctionStatsManager.USER_EXCEPTIONS_TOTAL, instanceId, metrics.getUserExceptionsTotal());


for (Map.Entry<String, Double> userMetricsMapEntry : metrics.getUserMetricsMap().entrySet()) { out.write(functionRuntime.getPrometheusMetrics());
String userMetricName = userMetricsMapEntry.getKey();
Double val = userMetricsMapEntry.getValue();
metric(out, cluster, qualifiedNamespace, name, FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX + userMetricName, instanceId, val);
}


} catch (InterruptedException | ExecutionException e) { } catch (IOException e) {
log.warn("Failed to collect metrics for function instance {}", log.warn("Failed to collect metrics for function instance {}",
fullyQualifiedInstanceName, e); fullyQualifiedInstanceName, e);
} }
Expand All @@ -90,16 +67,4 @@ public static void generate(WorkerService workerService, String cluster, SimpleT
} }
} }
} }

private static void metricType(SimpleTextOutputStream stream, String name) {
stream.write("# TYPE ").write(name).write(" gauge\n");
}

private static void metric(SimpleTextOutputStream stream, String cluster, String namespace,
String functionName, String metricName, int instanceId, double value) {
metricType(stream, metricName);
stream.write(metricName).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
.write("\",name=\"").write(functionName).write("\",instanceId=\"").write(instanceId).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}
} }

0 comments on commit 8d24102

Please sign in to comment.