Skip to content

Commit

Permalink
Schedule task to update function stats separately (#2128)
Browse files Browse the repository at this point in the history
keep previous stats sample

configurable update metrics task

fix test
  • Loading branch information
rdhabalia committed Jul 12, 2018
1 parent 2f38b89 commit 14fc5d9
Show file tree
Hide file tree
Showing 20 changed files with 336 additions and 36 deletions.
1 change: 1 addition & 0 deletions conf/functions_worker.yml
Expand Up @@ -44,3 +44,4 @@ rescheduleTimeoutMs: 60000
initialBrokerReconnectMaxRetries: 60
assignmentWriteMaxRetries: 60
instanceLivenessCheckFreqMs: 30000
metricsSamplingPeriodSec: 60
Expand Up @@ -91,6 +91,7 @@ public void update(double value) {
}
}

private ConcurrentMap<String, AccumulatedMetricDatum> currentAccumulatedMetrics;
private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics;

private Map<String, Producer> publishProducers;
Expand All @@ -110,6 +111,7 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
this.logger = logger;
this.pulsarClient = client;
this.classLoader = classLoader;
this.currentAccumulatedMetrics = new ConcurrentHashMap<>();
this.accumulatedMetrics = new ConcurrentHashMap<>();
this.publishProducers = new HashMap<>();
this.publishSerializers = new HashMap<>();
Expand Down Expand Up @@ -324,11 +326,23 @@ public CompletableFuture<Void> ack(byte[] messageId) {

@Override
public void recordMetric(String metricName, double value) {
accumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum());
accumulatedMetrics.get(metricName).update(value);
currentAccumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum());
currentAccumulatedMetrics.get(metricName).update(value);
}

public MetricsData getAndResetMetrics() {
MetricsData retval = getMetrics();
resetMetrics();
return retval;
}

public void resetMetrics() {
this.accumulatedMetrics.clear();
this.accumulatedMetrics.putAll(currentAccumulatedMetrics);
this.currentAccumulatedMetrics.clear();
}

public MetricsData getMetrics() {
MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder();
for (String metricName : accumulatedMetrics.keySet()) {
MetricsData.DataDigest.Builder bldr = MetricsData.DataDigest.newBuilder();
Expand All @@ -339,7 +353,6 @@ public MetricsData getAndResetMetrics() {
metricsDataBuilder.putMetrics(metricName, bldr.build());
}
MetricsData retval = metricsDataBuilder.build();
accumulatedMetrics.clear();
return retval;
}
}
Expand Up @@ -102,13 +102,35 @@ public double computeLatency() {
return totalLatencyMs / totalSuccessfullyProcessed;
}
}

public void update(Stats stats) {
if (stats == null) {
return;
}
this.totalProcessed = stats.totalProcessed;
this.totalSuccessfullyProcessed = stats.totalSuccessfullyProcessed;
this.totalUserExceptions = stats.totalUserExceptions;
this.latestUserExceptions.clear();
this.latestSystemExceptions.clear();
this.totalDeserializationExceptions.clear();
this.latestUserExceptions.addAll(stats.latestUserExceptions);
this.latestSystemExceptions.addAll(stats.latestSystemExceptions);
this.totalDeserializationExceptions.putAll(stats.totalDeserializationExceptions);
this.totalSystemExceptions = stats.totalSystemExceptions;
this.latestSystemExceptions = stats.latestSystemExceptions;
this.totalSerializationExceptions = stats.totalSerializationExceptions;
this.totalLatencyMs = stats.totalLatencyMs;
this.lastInvocationTime = stats.lastInvocationTime;
}
}

private Stats currentStats;
private Stats totalStats;

private Stats stats;

public FunctionStats() {
currentStats = new Stats();
stats = new Stats();
totalStats = new Stats();
}

Expand Down Expand Up @@ -138,6 +160,7 @@ public void incrementSerializationExceptions() {
totalStats.incrementSerializationExceptions();
}
public void resetCurrent() {
stats.update(currentStats);
currentStats.reset();
}
}
Expand Up @@ -76,4 +76,12 @@ public void close() {
public InstanceCommunication.MetricsData getAndResetMetrics() {
return context.getAndResetMetrics();
}

public void resetMetrics() {
context.resetMetrics();
}

public InstanceCommunication.MetricsData getMetrics() {
return context.getMetrics();
}
}
Expand Up @@ -52,9 +52,9 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.sink.PulsarSink;
Expand Down Expand Up @@ -389,16 +389,7 @@ public void close() {
}

public InstanceCommunication.MetricsData getAndResetMetrics() {
InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
addSystemMetrics("__total_processed__", stats.getCurrentStats().getTotalProcessed(), bldr);
addSystemMetrics("__total_successfully_processed__", stats.getCurrentStats().getTotalSuccessfullyProcessed(), bldr);
addSystemMetrics("__total_system_exceptions__", stats.getCurrentStats().getTotalSystemExceptions(), bldr);
addSystemMetrics("__total_user_exceptions__", stats.getCurrentStats().getTotalUserExceptions(), bldr);
stats.getCurrentStats().getTotalDeserializationExceptions().forEach((topic, count) -> {
addSystemMetrics("__total_deserialization_exceptions__" + topic, count, bldr);
});
addSystemMetrics("__total_serialization_exceptions__", stats.getCurrentStats().getTotalSerializationExceptions(), bldr);
addSystemMetrics("__avg_latency_ms__", stats.getCurrentStats().computeLatency(), bldr);
InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
stats.resetCurrent();
if (javaInstance != null) {
InstanceCommunication.MetricsData userMetrics = javaInstance.getAndResetMetrics();
Expand All @@ -409,6 +400,38 @@ public InstanceCommunication.MetricsData getAndResetMetrics() {
return bldr.build();
}

public InstanceCommunication.MetricsData getMetrics() {
InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
if (javaInstance != null) {
InstanceCommunication.MetricsData userMetrics = javaInstance.getMetrics();
if (userMetrics != null) {
bldr.putAllMetrics(userMetrics.getMetricsMap());
}
}
return bldr.build();
}

public void resetMetrics() {
stats.resetCurrent();
javaInstance.resetMetrics();
}

private Builder createMetricsDataBuilder() {
InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
addSystemMetrics("__total_processed__", stats.getStats().getTotalProcessed(), bldr);
addSystemMetrics("__total_successfully_processed__", stats.getStats().getTotalSuccessfullyProcessed(),
bldr);
addSystemMetrics("__total_system_exceptions__", stats.getStats().getTotalSystemExceptions(), bldr);
addSystemMetrics("__total_user_exceptions__", stats.getStats().getTotalUserExceptions(), bldr);
stats.getStats().getTotalDeserializationExceptions().forEach((topic, count) -> {
addSystemMetrics("__total_deserialization_exceptions__" + topic, count, bldr);
});
addSystemMetrics("__total_serialization_exceptions__",
stats.getStats().getTotalSerializationExceptions(), bldr);
addSystemMetrics("__avg_latency_ms__", stats.getStats().computeLatency(), bldr);
return bldr;
}

public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
functionStatusBuilder.setNumProcessed(stats.getTotalStats().getTotalProcessed());
Expand Down
Expand Up @@ -39,7 +39,7 @@
name='InstanceCommunication.proto',
package='proto',
syntax='proto3',
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08 \x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n \x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b \x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r \x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01 \x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"G\n\x12\x46unctionStatusList\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\xd2\x01\n\x0bMetricsData\x12\x30\n\x07metrics\x18\x01 \x03(\x0b\x32\x1f.proto.MetricsData.MetricsEntry\x1a\x42\n\nDataDigest\x12\r\n\x05\x63ount\x18\x01 \x01(\x01\x12\x0b\n\x03sum\x18\x02 \x01(\x01\x12\x0b\n\x03max\x18\x03 \x01(\x01\x12\x0b\n\x03min\x18\x04 \x01(\x01\x1aM\n\x0cMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.proto.MetricsData.DataDigest:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01 \x01(\x08\x32\xde\x01\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08 \x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n \x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b \x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r \x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01 \x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"G\n\x12\x46unctionStatusList\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\xd2\x01\n\x0bMetricsData\x12\x30\n\x07metrics\x18\x01 \x03(\x0b\x32\x1f.proto.MetricsData.MetricsEntry\x1a\x42\n\nDataDigest\x12\r\n\x05\x63ount\x18\x01 \x01(\x01\x12\x0b\n\x03sum\x18\x02 \x01(\x01\x12\x0b\n\x03max\x18\x03 \x01(\x01\x12\x0b\n\x03min\x18\x04 \x01(\x01\x1aM\n\x0cMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.proto.MetricsData.DataDigest:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01 \x01(\x08\x32\xdc\x02\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12@\n\x0cResetMetrics\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nGetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
,
dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])

Expand Down Expand Up @@ -513,7 +513,7 @@
index=0,
options=None,
serialized_start=1068,
serialized_end=1290,
serialized_end=1416,
methods=[
_descriptor.MethodDescriptor(
name='GetFunctionStatus',
Expand All @@ -533,10 +533,28 @@
output_type=_METRICSDATA,
options=None,
),
_descriptor.MethodDescriptor(
name='ResetMetrics',
full_name='proto.InstanceControl.ResetMetrics',
index=2,
containing_service=None,
input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
output_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
options=None,
),
_descriptor.MethodDescriptor(
name='GetMetrics',
full_name='proto.InstanceControl.GetMetrics',
index=3,
containing_service=None,
input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
output_type=_METRICSDATA,
options=None,
),
_descriptor.MethodDescriptor(
name='HealthCheck',
full_name='proto.InstanceControl.HealthCheck',
index=2,
index=4,
containing_service=None,
input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
output_type=_HEALTHCHECKRESULT,
Expand Down
Expand Up @@ -44,6 +44,16 @@ def __init__(self, channel):
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
)
self.ResetMetrics = channel.unary_unary(
'/proto.InstanceControl/ResetMetrics',
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
)
self.GetMetrics = channel.unary_unary(
'/proto.InstanceControl/GetMetrics',
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
)
self.HealthCheck = channel.unary_unary(
'/proto.InstanceControl/HealthCheck',
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
Expand All @@ -69,6 +79,20 @@ def GetAndResetMetrics(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def ResetMetrics(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetMetrics(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def HealthCheck(self, request, context):
# missing associated documentation comment in .proto file
pass
Expand All @@ -89,6 +113,16 @@ def add_InstanceControlServicer_to_server(servicer, server):
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
),
'ResetMetrics': grpc.unary_unary_rpc_method_handler(
servicer.ResetMetrics,
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
),
'GetMetrics': grpc.unary_unary_rpc_method_handler(
servicer.GetMetrics,
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
),
'HealthCheck': grpc.unary_unary_rpc_method_handler(
servicer.HealthCheck,
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
Expand Down
23 changes: 17 additions & 6 deletions pulsar-functions/instance/src/main/python/contextimpl.py
Expand Up @@ -54,6 +54,7 @@ def __init__(self, instance_config, logger, pulsar_client, user_code, consumers)
self.pulsar_client = pulsar_client
self.user_code_dir = os.path.dirname(user_code)
self.consumers = consumers
self.current_accumulated_metrics = {}
self.accumulated_metrics = {}
self.publish_producers = {}
self.publish_serializers = {}
Expand Down Expand Up @@ -107,9 +108,9 @@ def get_user_config_map(self):
return self.user_config

def record_metric(self, metric_name, metric_value):
if not metric_name in self.accumulated_metrics:
self.accumulated_metrics[metric_name] = AccumulatedMetricDatum()
self.accumulated_metrics[metric_name].update(metric_value)
if not metric_name in self.current_accumulated_metrics:
self.current_accumulated_metrics[metric_name] = AccumulatedMetricDatum()
self.current_accumulated_metrics[metric_name].update(metric_value)

def get_output_topic(self):
return self.instance_config.function_details.output
Expand Down Expand Up @@ -143,6 +144,18 @@ def ack(self, msgid, topic):
self.consumers[topic].acknowledge(msgid)

def get_and_reset_metrics(self):
metrics = self.get_metrics()
# TODO(sanjeev):- Make this thread safe
self.reset_metrics()
return metrics

def reset_metrics(self):
# TODO: Make it thread safe
self.accumulated_metrics.clear()
self.accumulated_metrics.update(self.current_accumulated_metrics)
self.current_accumulated_metrics.clear()

def get_metrics(self):
metrics = InstanceCommunication_pb2.MetricsData()
for metric_name, accumulated_metric in self.accumulated_metrics.items():
m = InstanceCommunication_pb2.MetricsData.DataDigest()
Expand All @@ -151,6 +164,4 @@ def get_and_reset_metrics(self):
m.max = accumulated_metric.max
m.min = accumulated_metric.min
metrics.metrics[metric_name] = m
# TODO(sanjeev):- Make this thread safe
self.accumulated_metrics.clear()
return metrics
return metrics

0 comments on commit 14fc5d9

Please sign in to comment.