Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional metrics for Pulsar Function Worker #7685

Merged
merged 1 commit into from Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -284,6 +284,8 @@ public void start(URI dlogUri,
log.info("/** Started worker id={} **/", workerConfig.getWorkerId());

workerStatsManager.setFunctionRuntimeManager(functionRuntimeManager);
workerStatsManager.setFunctionMetaDataManager(functionMetaDataManager);
workerStatsManager.setLeaderService(leaderService);
workerStatsManager.startupTimeEnd();
} catch (Throwable t) {
log.error("Error Starting up in worker", t);
Expand Down
Expand Up @@ -24,29 +24,40 @@
import io.prometheus.client.Summary;
import lombok.Setter;
import org.apache.pulsar.functions.instance.stats.PrometheusTextFormat;
import org.apache.pulsar.functions.proto.Function;

import java.io.IOException;
import java.io.StringWriter;
import java.util.List;

public class WorkerStatsManager {

private static final String PULSAR_FUNCTION_WORKER_METRICS_PREFIX = "pulsar_function_worker_";
private static final String START_UP_TIME = "start_up_time_ms";
private static final String INSTANCE_COUNT = "instance_count";
private static final String TOTAL_EXPECTED_INSTANCE_COUNT = "total_expected_instance_count";
private static final String TOTAL_FUNCTIONS_COUNT = "total_function_count";
private static final String SCHEDULE_TOTAL_EXEC_TIME = "schedule_execution_time_total_ms";
private static final String SCHEDULE_STRATEGY_EXEC_TIME = "schedule_strategy_execution_time_ms";
private static final String REBALANCE_TOTAL_EXEC_TIME = "rebalance_execution_time_total_ms";
private static final String REBALANCE_STRATEGY_EXEC_TIME = "rebalance_strategy_execution_time_ms";
private static final String STOPPING_INSTANCE_PROCESS_TIME = "stop_instance_process_time_ms";
private static final String UPDATING_INSTANCE_PROCESS_TIME = "update_instance_process_time_ms";
private static final String STARTING_INSTANCE_PROCESS_TIME = "start_instance_process_time_ms";
private static final String IS_LEADER = "is_leader";


private static final String[] metricsLabelNames = {"cluster"};
private final String[] metricsLabels;

@Setter
private FunctionRuntimeManager functionRuntimeManager;

@Setter
private FunctionMetaDataManager functionMetaDataManager;

@Setter
private LeaderService leaderService;

private CollectorRegistry collectorRegistry = new CollectorRegistry();

private final Summary statWorkerStartupTime;
Expand Down Expand Up @@ -92,8 +103,7 @@ public WorkerStatsManager(WorkerConfig workerConfig) {
.labelNames(metricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.quantile(1, 0.01)
.register(collectorRegistry);
_scheduleTotalExecutionTime = scheduleTotalExecutionTime.labels(metricsLabels);

Expand All @@ -103,8 +113,7 @@ public WorkerStatsManager(WorkerConfig workerConfig) {
.labelNames(metricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.quantile(1, 0.01)
.register(collectorRegistry);
_scheduleStrategyExecutionTime = scheduleStrategyExecutionTime.labels(metricsLabels);

Expand All @@ -114,8 +123,7 @@ public WorkerStatsManager(WorkerConfig workerConfig) {
.labelNames(metricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.quantile(1, 0.01)
.register(collectorRegistry);
_rebalanceTotalExecutionTime = rebalanceTotalExecutionTime.labels(metricsLabels);

Expand All @@ -125,8 +133,7 @@ public WorkerStatsManager(WorkerConfig workerConfig) {
.labelNames(metricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.quantile(1, 0.01)
.register(collectorRegistry);
_rebalanceStrategyExecutionTime = rebalanceStrategyExecutionTime.labels(metricsLabels);

Expand All @@ -136,8 +143,7 @@ public WorkerStatsManager(WorkerConfig workerConfig) {
.labelNames(metricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.quantile(1, 0.01)
.register(collectorRegistry);
_stopInstanceProcessTime = stopInstanceProcessTime.labels(metricsLabels);

Expand All @@ -147,8 +153,7 @@ public WorkerStatsManager(WorkerConfig workerConfig) {
.labelNames(metricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.quantile(1, 0.01)
.register(collectorRegistry);
_startInstanceProcessTime = startInstanceProcessTime.labels(metricsLabels);
}
Expand Down Expand Up @@ -245,6 +250,46 @@ public String getStatsAsString() throws IOException {

PrometheusTextFormat.write004(outputWriter, collectorRegistry.metricFamilySamples());

generateLeaderMetrics(outputWriter);
return outputWriter.toString();
}

private void generateLeaderMetrics(StringWriter stream) {
if (leaderService.isLeader()) {

List<Function.FunctionMetaData> metadata = functionMetaDataManager.getAllFunctionMetaData();
// get total number functions
long totalFunctions = metadata.size();
writeMetric(TOTAL_FUNCTIONS_COUNT, totalFunctions, stream);

// get total expected number of instances
long totalInstances = 0;
for (Function.FunctionMetaData entry : metadata) {
totalInstances += entry.getFunctionDetails().getParallelism();
}
writeMetric(TOTAL_EXPECTED_INSTANCE_COUNT, totalInstances, stream);

// is this worker is the leader
writeMetric(IS_LEADER, 1, stream);
}
}

private void writeMetric(String metricName, long value, StringWriter stream) {
stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX);
stream.write(metricName);
stream.write("{");

for (int i = 0; i < metricsLabelNames.length; i++) {
stream.write(metricsLabelNames[i]);
stream.write('=');
stream.write('\"');
stream.write(metricsLabels[i]);
stream.write("\",");
}
stream.write('}');

stream.write(' ');
stream.write(String.valueOf(value));
stream.write('\n');
}
}