Skip to content

Commit

Permalink
Overhaul stats in both Java and Python instances (#148)
Browse files Browse the repository at this point in the history
* Overhaul stats in both Java and Python instances

* Added licence header
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent f77cd00 commit 1ebc04d
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 103 deletions.
Expand Up @@ -32,6 +32,12 @@ message FunctionStatus {
int64 numTimeouts = 5;
int64 numUserExceptions = 6;
int64 numSystemExceptions = 7;
// map from topic name to number of deserialization exceptions
map<string, int64> deserializationExceptions = 8;
// number of serialization exceptions on the output
int64 serializationExceptions = 9;
// average latency
double averageLatency = 10;
}

message MetricsData {
Expand Down
Expand Up @@ -22,22 +22,100 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;

/**
* Function stats.
*/
@Slf4j
@Getter
@Setter
public class FunctionStats {
private long totalProcessed;
private long totalSuccessfullyProcessed;
private long totalUserExceptions;
private long totalSystemExceptions;
private long totalTimeoutExceptions;

public void incrementProcessed() { totalProcessed++; }
public void incrementSuccessfullyProcessed() { totalSuccessfullyProcessed++; }
public void incrementUserExceptions() { totalUserExceptions++; }
public void incrementSystemExceptions() { totalSystemExceptions++; }
public void incrementTimeoutExceptions() { totalTimeoutExceptions++; }
@Getter
@Setter
class Stats {
private long totalProcessed;
private long totalSuccessfullyProcessed;
private long totalUserExceptions;
private long totalSystemExceptions;
private long totalTimeoutExceptions;
private Map<String, Long> totalDeserializationExceptions = new HashMap<>();
private long totalSerializationExceptions;
private long totalLatencyMs;

public void incrementProcessed() { totalProcessed++; }
public void incrementSuccessfullyProcessed(long latency) {
totalSuccessfullyProcessed++;
totalLatencyMs += latency;
}
public void incrementUserExceptions() { totalUserExceptions++; }
public void incrementSystemExceptions() { totalSystemExceptions++; }
public void incrementTimeoutExceptions() { totalTimeoutExceptions++; }
public void incrementDeserializationExceptions(String topic) {
if (!totalDeserializationExceptions.containsKey(topic)) {
totalDeserializationExceptions.put(topic, 0l);
}
totalDeserializationExceptions.put(topic, totalDeserializationExceptions.get(topic) + 1);
}
public void incrementSerializationExceptions() { totalSerializationExceptions++; }
public void reset() {
totalProcessed = 0;
totalSuccessfullyProcessed = 0;
totalUserExceptions = 0;
totalSystemExceptions = 0;
totalTimeoutExceptions = 0;
totalDeserializationExceptions.clear();
totalSerializationExceptions = 0;
totalLatencyMs = 0;
}
public double computeLatency() {
if (totalSuccessfullyProcessed <= 0) {
return 0;
} else {
return totalLatencyMs / totalSuccessfullyProcessed;
}
}
}

private Stats currentStats;
private Stats totalStats;

public FunctionStats() {
currentStats = new Stats();
totalStats = new Stats();
}

public void incrementProcessed() {
currentStats.incrementProcessed();
totalStats.incrementProcessed();
}

public void incrementSuccessfullyProcessed(long latency) {
currentStats.incrementSuccessfullyProcessed(latency);
totalStats.incrementSuccessfullyProcessed(latency);
}
public void incrementUserExceptions() {
currentStats.incrementUserExceptions();
totalStats.incrementUserExceptions();
}
public void incrementSystemExceptions() {
currentStats.incrementSystemExceptions();
totalStats.incrementSystemExceptions();
}
public void incrementTimeoutExceptions() {
currentStats.incrementTimeoutExceptions();
totalStats.incrementTimeoutExceptions();
}
public void incrementDeserializationExceptions(String topic) {
currentStats.incrementDeserializationExceptions(topic);
totalStats.incrementDeserializationExceptions(topic);
}
public void incrementSerializationExceptions() {
currentStats.incrementSerializationExceptions();
totalStats.incrementSerializationExceptions();
}
public void resetCurrent() {
currentStats.reset();
}
}
Expand Up @@ -133,16 +133,22 @@ public void run() {
}

// process the message

long processAt = System.nanoTime();
stats.incrementProcessed();
Object input = msg.getInputSerDe().deserialize(msg.getActualMessage().getData());
Object input;
try {
input = msg.getInputSerDe().deserialize(msg.getActualMessage().getData());
} catch (Exception ex) {
stats.incrementDeserializationExceptions(msg.getTopicName());
continue;
}
long processAt = System.currentTimeMillis();
result = javaInstance.handleMessage(
msg.getActualMessage().getMessageId(),
msg.getTopicName(),
input);
long doneProcessing = System.currentTimeMillis();
log.debug("Got result: {}", result.getResult());
processResult(msg, result, processAt);
processResult(msg, result, processAt, doneProcessing);
}

javaInstance.close();
Expand Down Expand Up @@ -222,7 +228,7 @@ private void startSourceConsumers() throws Exception {
}
}

private void processResult(InputMessage msg, JavaExecutionResult result, long processAt) {
private void processResult(InputMessage msg, JavaExecutionResult result, long startTime, long endTime) {
if (result.getUserException() != null) {
log.info("Encountered user exception when processing message {}", msg, result.getUserException());
stats.incrementUserExceptions();
Expand All @@ -233,9 +239,14 @@ private void processResult(InputMessage msg, JavaExecutionResult result, long pr
log.info("Timedout when processing message {}", msg, result.getTimeoutException());
stats.incrementTimeoutExceptions();
} else {
stats.incrementSuccessfullyProcessed();
stats.incrementSuccessfullyProcessed(endTime - startTime);
if (result.getResult() != null && sinkProducer != null) {
byte[] output = outputSerDe.serialize(result.getResult());
byte[] output = null;
try {
output = outputSerDe.serialize(result.getResult());
} catch (Exception ex) {
stats.incrementSerializationExceptions();
}
if (output != null) {
sinkProducer.sendAsync(output)
.thenAccept(messageId -> {
Expand Down Expand Up @@ -296,11 +307,17 @@ public void close() {

public InstanceCommunication.MetricsData getAndResetMetrics() {
InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
addSystemMetrics("__total_processed__", stats.getTotalProcessed(), bldr);
addSystemMetrics("__total_successfully_processed__", stats.getTotalSuccessfullyProcessed(), bldr);
addSystemMetrics("__total_system_exceptions__", stats.getTotalSystemExceptions(), bldr);
addSystemMetrics("__total_timeout_exceptions__", stats.getTotalTimeoutExceptions(), bldr);
addSystemMetrics("__total_user_exceptions__", stats.getTotalUserExceptions(), bldr);
addSystemMetrics("__total_processed__", stats.getCurrentStats().getTotalProcessed(), bldr);
addSystemMetrics("__total_successfully_processed__", stats.getCurrentStats().getTotalSuccessfullyProcessed(), bldr);
addSystemMetrics("__total_system_exceptions__", stats.getCurrentStats().getTotalSystemExceptions(), bldr);
addSystemMetrics("__total_timeout_exceptions__", stats.getCurrentStats().getTotalTimeoutExceptions(), bldr);
addSystemMetrics("__total_user_exceptions__", stats.getCurrentStats().getTotalUserExceptions(), bldr);
stats.getCurrentStats().getTotalDeserializationExceptions().forEach((topic, count) -> {
addSystemMetrics("__total_deserialization_exceptions__" + topic, count, bldr);
});
addSystemMetrics("__total_serialization_exceptions__", stats.getCurrentStats().getTotalSerializationExceptions(), bldr);
addSystemMetrics("__avg_latency_ms__", stats.getCurrentStats().computeLatency(), bldr);
stats.resetCurrent();
if (javaInstance != null) {
InstanceCommunication.MetricsData userMetrics = javaInstance.getAndResetMetrics();
if (userMetrics != null) {
Expand All @@ -312,11 +329,14 @@ public InstanceCommunication.MetricsData getAndResetMetrics() {

public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
functionStatusBuilder.setNumProcessed(stats.getTotalProcessed());
functionStatusBuilder.setNumSuccessfullyProcessed(stats.getTotalSuccessfullyProcessed());
functionStatusBuilder.setNumUserExceptions(stats.getTotalUserExceptions());
functionStatusBuilder.setNumSystemExceptions(stats.getTotalSystemExceptions());
functionStatusBuilder.setNumTimeouts(stats.getTotalTimeoutExceptions());
functionStatusBuilder.setNumProcessed(stats.getTotalStats().getTotalProcessed());
functionStatusBuilder.setNumSuccessfullyProcessed(stats.getTotalStats().getTotalSuccessfullyProcessed());
functionStatusBuilder.setNumUserExceptions(stats.getTotalStats().getTotalUserExceptions());
functionStatusBuilder.setNumSystemExceptions(stats.getTotalStats().getTotalSystemExceptions());
functionStatusBuilder.setNumTimeouts(stats.getTotalStats().getTotalTimeoutExceptions());
functionStatusBuilder.putAllDeserializationExceptions(stats.getTotalStats().getTotalDeserializationExceptions());
functionStatusBuilder.setSerializationExceptions(stats.getTotalStats().getTotalSerializationExceptions());
functionStatusBuilder.setAverageLatency(stats.getTotalStats().computeLatency());
return functionStatusBuilder;
}

Expand Down

0 comments on commit 1ebc04d

Please sign in to comment.