Skip to content

Commit

Permalink
Correcting metrics and adding tests (#3050)
Browse files Browse the repository at this point in the history
* Correcting metrics and adding tests

* remove commmented out code

* remove space

* fix integration test

* improving impl

* fix ObjectMapper
  • Loading branch information
jerrypeng committed Nov 26, 2018
1 parent fe2c8ee commit b721ae3
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 33 deletions.
Expand Up @@ -30,6 +30,7 @@
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -189,6 +190,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/stats")
public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
Expand All @@ -205,6 +207,7 @@ public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
Expand Down
Expand Up @@ -353,6 +353,55 @@ public void testPulsarFunctionStats() throws Exception {
// validate pulsar sink consumer has started on the topic
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);

// validate stats are empty
FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
FunctionStats functionStats = functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
functionName, null);
FunctionStats functionStatsFromAdmin = admin.functions().getFunctionStats(tenant, namespacePortion,
functionName);

assertEquals(functionStats, functionStatsFromAdmin);

assertEquals(functionStats.getReceivedTotal(), 0);
assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0);
assertEquals(functionStats.getSystemExceptionsTotal(), 0);
assertEquals(functionStats.getUserExceptionsTotal(), 0);
assertEquals(functionStats.avgProcessLatency, null);
assertEquals(functionStats.oneMin.getReceivedTotal(), 0);
assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0);
assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
assertEquals(functionStats.oneMin.getAvgProcessLatency(), null);
assertEquals(functionStats.getAvgProcessLatency(), functionStats.oneMin.getAvgProcessLatency());
assertEquals(functionStats.getLastInvocation(), null);

assertEquals(functionStats.instances.size(), 1);
assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().avgProcessLatency, null);
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency(), null);

assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), functionStats.getAvgProcessLatency());

// validate function instance stats empty
FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats(tenant, namespacePortion,
functionName, 0, null);

FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsAdmin = admin.functions().getFunctionStats(tenant, namespacePortion,
functionName, 0);

assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
assertEquals(functionInstanceStats, functionStats.instances.get(0).getMetrics());


int totalMsgs = 10;
for (int i = 0; i < totalMsgs; i++) {
String data = "my-message-" + i;
Expand All @@ -367,11 +416,12 @@ public void testPulsarFunctionStats() throws Exception {
}
}, 5, 200);

FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
FunctionStats functionStats = functionRuntimeManager.getFunctionStats(tenant, namespacePortion,

// get stats after producing
functionStats = functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
functionName, null);

FunctionStats functionStatsFromAdmin = admin.functions().getFunctionStats(tenant, namespacePortion,
functionStatsFromAdmin = admin.functions().getFunctionStats(tenant, namespacePortion,
functionName);

assertEquals(functionStats, functionStatsFromAdmin);
Expand Down Expand Up @@ -404,6 +454,16 @@ public void testPulsarFunctionStats() throws Exception {

assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), functionStats.getAvgProcessLatency());

// validate function instance stats
functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats(tenant, namespacePortion,
functionName, 0, null);

functionInstanceStatsAdmin = admin.functions().getFunctionStats(tenant, namespacePortion,
functionName, 0);

assertEquals(functionInstanceStats, functionInstanceStatsAdmin);
assertEquals(functionInstanceStats, functionStats.instances.get(0).getMetrics());
}

@Test(timeOut = 20000)
Expand Down
Expand Up @@ -18,17 +18,20 @@
*/
package org.apache.pulsar.common.policies.data;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import lombok.Data;
import org.apache.pulsar.common.util.ObjectMapperFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

@Data
@JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", "lastInvocation", "instances" })
public class FunctionStats {

Expand All @@ -55,24 +58,26 @@ public class FunctionStats {
/**
* Average process latency for function
**/
public double avgProcessLatency;
public Double avgProcessLatency;

@JsonProperty("1min")
public FunctionInstanceStats.FunctionInstanceStatsDataBase oneMin = new FunctionInstanceStats.FunctionInstanceStatsDataBase();

/**
* Timestamp of when the function was last invoked by any instance
**/
public long lastInvocation;
public Long lastInvocation;

@Data
@JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "instanceId", "metrics" })
public static class FunctionInstanceStats {

/** Instance Id of function instance **/
public int instanceId;

@Data
@JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency" })
public static class FunctionInstanceStatsDataBase {
/**
Expand All @@ -98,10 +103,11 @@ public static class FunctionInstanceStatsDataBase {
/**
* Average process latency for function for instance
**/
public double avgProcessLatency;
public Double avgProcessLatency;
}

@Data
@JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", "lastInvocation", "userMetrics" })
public static class FunctionInstanceStatsData extends FunctionInstanceStatsDataBase {

Expand All @@ -111,7 +117,7 @@ public static class FunctionInstanceStatsData extends FunctionInstanceStatsDataB
/**
* Timestamp of when the function was last invoked for instance
**/
public long lastInvocation;
public Long lastInvocation;

/**
* Map of user defined metrics
Expand All @@ -130,35 +136,59 @@ public void addInstance(FunctionInstanceStats functionInstanceStats) {

public FunctionStats calculateOverall() {

lastInvocation = 0;
instances.forEach(new Consumer<FunctionInstanceStats>() {
@Override
public void accept(FunctionInstanceStats functionInstanceStats) {
int nonNullInstances = 0;
int nonNullInstancesOneMin = 0;
for (FunctionInstanceStats functionInstanceStats : instances) {
FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData = functionInstanceStats.getMetrics();
receivedTotal += functionInstanceStatsData.receivedTotal;
processedSuccessfullyTotal += functionInstanceStatsData.processedSuccessfullyTotal;
systemExceptionsTotal += functionInstanceStatsData.systemExceptionsTotal;
userExceptionsTotal += functionInstanceStatsData.userExceptionsTotal;
avgProcessLatency += functionInstanceStatsData.avgProcessLatency;
if (functionInstanceStatsData.avgProcessLatency != null) {
if (avgProcessLatency == null) {
avgProcessLatency = 0.0;
}
avgProcessLatency += functionInstanceStatsData.avgProcessLatency;
nonNullInstances ++;
}

oneMin.receivedTotal += functionInstanceStatsData.oneMin.receivedTotal;
oneMin.processedSuccessfullyTotal += functionInstanceStatsData.oneMin.processedSuccessfullyTotal;
oneMin.systemExceptionsTotal += functionInstanceStatsData.oneMin.systemExceptionsTotal;
oneMin.userExceptionsTotal += functionInstanceStatsData.oneMin.userExceptionsTotal;
oneMin.avgProcessLatency += functionInstanceStatsData.oneMin.avgProcessLatency;

if (functionInstanceStatsData.lastInvocation > lastInvocation) {
lastInvocation = functionInstanceStatsData.lastInvocation;
if (functionInstanceStatsData.oneMin.avgProcessLatency != null) {
if (oneMin.avgProcessLatency == null) {
oneMin.avgProcessLatency = 0.0;
}
oneMin.avgProcessLatency += functionInstanceStatsData.oneMin.avgProcessLatency;
nonNullInstancesOneMin ++;
}

if (functionInstanceStatsData.lastInvocation != null) {
if (lastInvocation == null || functionInstanceStatsData.lastInvocation > lastInvocation) {
lastInvocation = functionInstanceStatsData.lastInvocation;
}
}
}
});

// calculate average from sum
avgProcessLatency = avgProcessLatency / instances.size();
if (nonNullInstances > 0) {
avgProcessLatency = avgProcessLatency / nonNullInstances;
} else {
avgProcessLatency = null;
}

// calculate 1min average from sum
oneMin.avgProcessLatency = oneMin.avgProcessLatency / instances.size();
if (nonNullInstancesOneMin > 0) {
oneMin.avgProcessLatency = oneMin.avgProcessLatency / nonNullInstancesOneMin;
} else {
oneMin.avgProcessLatency = null;
}

return this;
}

public static FunctionStats decode (String json) throws IOException {
return ObjectMapperFactory.getThreadLocal().readValue(json, FunctionStats.class);
}
}
Expand Up @@ -29,17 +29,8 @@
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.hotspot.BufferPoolsExports;
import io.prometheus.client.hotspot.ClassLoadingExports;
import io.prometheus.client.hotspot.DefaultExports;
import io.prometheus.client.hotspot.GarbageCollectorExports;
import io.prometheus.client.hotspot.MemoryPoolsExports;
import io.prometheus.client.hotspot.StandardExports;
import io.prometheus.client.hotspot.ThreadExports;
import io.prometheus.client.hotspot.VersionInfoExports;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
Expand All @@ -59,7 +50,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* A function container implemented using java thread.
Expand Down
Expand Up @@ -219,14 +219,14 @@ public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(Strin
functionInstanceStatsData.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal());
functionInstanceStatsData.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal());
functionInstanceStatsData.setUserExceptionsTotal(metricsData.getUserExceptionsTotal());
functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency());
functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation());
functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency() == 0.0 ? null : metricsData.getAvgProcessLatency());
functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation() == 0 ? null : metricsData.getLastInvocation());

functionInstanceStatsData.oneMin.setReceivedTotal(metricsData.getReceivedTotal1Min());
functionInstanceStatsData.oneMin.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal1Min());
functionInstanceStatsData.oneMin.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal1Min());
functionInstanceStatsData.oneMin.setUserExceptionsTotal(metricsData.getUserExceptionsTotal1Min());
functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min());
functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min() == 0.0 ? null : metricsData.getAvgProcessLatency1Min());

// Filter out values that are NaN
Map<String, Double> statsDataMap = metricsData.getUserMetricsMap().entrySet().stream()
Expand Down
Expand Up @@ -36,6 +36,7 @@
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -128,6 +129,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/stats")
public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
Expand All @@ -144,6 +146,7 @@ public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
Expand Down

0 comments on commit b721ae3

Please sign in to comment.