Skip to content

Commit

Permalink
Add function metrics with function-stats to get metrics on-demand (#2130
Browse files Browse the repository at this point in the history
)
  • Loading branch information
rdhabalia committed Jul 16, 2018
1 parent 7af6374 commit 5f779b4
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 32 deletions.
Expand Up @@ -48,22 +48,26 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
Expand Down Expand Up @@ -244,6 +248,7 @@ public void testE2EPulsarSink() throws Exception {
final String sinkTopic = "persistent://" + replNamespace + "/output";
final String propertyKey = "key";
final String propertyValue = "value";
final String functionName = "PulsarSink-test";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
Expand All @@ -254,7 +259,7 @@ public void testE2EPulsarSink() throws Exception {

String jarFilePathUrl = Utils.FILE + ":"
+ PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, "PulsarSink-test",
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName,
sinkTopic);
admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);

Expand Down Expand Up @@ -298,7 +303,76 @@ public void testE2EPulsarSink() throws Exception {

}

protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String sinkName, String sinkTopic) {

@Test(timeOut = 20000)
public void testPulsarSinkStats() throws Exception {

final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
final String sinkTopic = "persistent://" + replNamespace + "/output";
final String propertyKey = "key";
final String propertyValue = "value";
final String functionName = "PulsarSink-test";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);

// create a producer that creates a topic at broker
Producer<byte[]> producer = pulsarClient.newProducer().topic(sourceTopic).create();

String jarFilePathUrl = Utils.FILE + ":"
+ PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName,
sinkTopic);
admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);

// try to update function to test: update-function functionality
admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl);

retryStrategically((test) -> {
try {
return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);
// validate pulsar sink consumer has started on the topic
Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);

int totalMsgs = 10;
for (int i = 0; i < totalMsgs; i++) {
String data = "my-message-" + i;
producer.newMessage().property(propertyKey, propertyValue).value(data.getBytes()).send();
}
retryStrategically((test) -> {
try {
SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
.next();
return subStats.unackedMessages == 0;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 500);

FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
functionRuntimeManager.updateRates();
FunctionStatusList functionStats = functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion,
functionName);

int numInstances = functionStats.getFunctionStatusListCount();
Assert.assertEquals(numInstances, 1);

FunctionStatus stats = functionStats.getFunctionStatusListList().get(0);
Map<String, DataDigest> metricsData = stats.getMetrics().getMetricsMap();

double count = metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount();
double success = metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount();
Assert.assertEquals((int) count, totalMsgs);
Assert.assertEquals((int) success, totalMsgs);
}

protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic) {

File file = new File(jarFile);
try {
Expand All @@ -312,7 +386,7 @@ protected FunctionDetails createSinkConfig(String jarFile, String tenant, String
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
functionDetailsBuilder.setTenant(tenant);
functionDetailsBuilder.setNamespace(namespace);
functionDetailsBuilder.setName(sinkName);
functionDetailsBuilder.setName(functionName);
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
functionDetailsBuilder.setParallelism(1);
functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
Expand Down
Expand Up @@ -108,6 +108,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {

private Source source;
private Sink sink;

public static final String METRICS_TOTAL_PROCESSED = "__total_processed__";
public static final String METRICS_TOTAL_SUCCESS = "__total_successfully_processed__";
public static final String METRICS_TOTAL_SYS_EXCEPTION = "__total_system_exceptions__";
public static final String METRICS_TOTAL_USER_EXCEPTION = "__total_user_exceptions__";
public static final String METRICS_TOTAL_DESERIALIZATION_EXCEPTION = "__total_deserialization_exceptions__";
public static final String METRICS_TOTAL_SERIALIZATION_EXCEPTION = "__total_serialization_exceptions__";
public static final String METRICS_AVG_LATENCY = "__avg_latency_ms__";

public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
Expand Down Expand Up @@ -418,17 +426,17 @@ public void resetMetrics() {

private Builder createMetricsDataBuilder() {
InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
addSystemMetrics("__total_processed__", stats.getStats().getTotalProcessed(), bldr);
addSystemMetrics("__total_successfully_processed__", stats.getStats().getTotalSuccessfullyProcessed(),
addSystemMetrics(METRICS_TOTAL_PROCESSED, stats.getStats().getTotalProcessed(), bldr);
addSystemMetrics(METRICS_TOTAL_SUCCESS, stats.getStats().getTotalSuccessfullyProcessed(),
bldr);
addSystemMetrics("__total_system_exceptions__", stats.getStats().getTotalSystemExceptions(), bldr);
addSystemMetrics("__total_user_exceptions__", stats.getStats().getTotalUserExceptions(), bldr);
addSystemMetrics(METRICS_TOTAL_SYS_EXCEPTION, stats.getStats().getTotalSystemExceptions(), bldr);
addSystemMetrics(METRICS_TOTAL_USER_EXCEPTION, stats.getStats().getTotalUserExceptions(), bldr);
stats.getStats().getTotalDeserializationExceptions().forEach((topic, count) -> {
addSystemMetrics("__total_deserialization_exceptions__" + topic, count, bldr);
addSystemMetrics(METRICS_TOTAL_DESERIALIZATION_EXCEPTION + topic, count, bldr);
});
addSystemMetrics("__total_serialization_exceptions__",
addSystemMetrics(METRICS_TOTAL_SERIALIZATION_EXCEPTION,
stats.getStats().getTotalSerializationExceptions(), bldr);
addSystemMetrics("__avg_latency_ms__", stats.getStats().computeLatency(), bldr);
addSystemMetrics(METRICS_AVG_LATENCY, stats.getStats().computeLatency(), bldr);
return bldr;
}

Expand Down
Expand Up @@ -39,7 +39,7 @@
name='InstanceCommunication.proto',
package='proto',
syntax='proto3',
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08 \x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n \x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b \x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r \x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01 \x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"G\n\x12\x46unctionStatusList\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\xd2\x01\n\x0bMetricsData\x12\x30\n\x07metrics\x18\x01 \x03(\x0b\x32\x1f.proto.MetricsData.MetricsEntry\x1a\x42\n\nDataDigest\x12\r\n\x05\x63ount\x18\x01 \x01(\x01\x12\x0b\n\x03sum\x18\x02 \x01(\x01\x12\x0b\n\x03max\x18\x03 \x01(\x01\x12\x0b\n\x03min\x18\x04 \x01(\x01\x1aM\n\x0cMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.proto.MetricsData.DataDigest:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01 \x01(\x08\x32\xdc\x02\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12@\n\x0cResetMetrics\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nGetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xc6\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08 \x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n \x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b \x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r \x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x12#\n\x07metrics\x18\x0f \x01(\x0b\x32\x12.proto.MetricsData\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01 \x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"G\n\x12\x46unctionStatusList\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\xd2\x01\n\x0bMetricsData\x12\x30\n\x07metrics\x18\x01 \x03(\x0b\x32\x1f.proto.MetricsData.MetricsEntry\x1a\x42\n\nDataDigest\x12\r\n\x05\x63ount\x18\x01 \x01(\x01\x12\x0b\n\x03sum\x18\x02 \x01(\x01\x12\x0b\n\x03max\x18\x03 \x01(\x01\x12\x0b\n\x03min\x18\x04 \x01(\x01\x1aM\n\x0cMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.proto.MetricsData.DataDigest:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01 \x01(\x08\x32\xdc\x02\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12@\n\x0cResetMetrics\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\"\x00\x12:\n\nGetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
,
dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])

Expand Down Expand Up @@ -79,8 +79,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=606,
serialized_end=675,
serialized_start=643,
serialized_end=712,
)

_FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY = _descriptor.Descriptor(
Expand Down Expand Up @@ -116,8 +116,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=677,
serialized_end=741,
serialized_start=714,
serialized_end=778,
)

_FUNCTIONSTATUS = _descriptor.Descriptor(
Expand Down Expand Up @@ -225,6 +225,13 @@
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='metrics', full_name='proto.FunctionStatus.metrics', index=14,
number=15, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
Expand All @@ -238,7 +245,7 @@
oneofs=[
],
serialized_start=68,
serialized_end=741,
serialized_end=778,
)


Expand Down Expand Up @@ -268,8 +275,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=743,
serialized_end=814,
serialized_start=780,
serialized_end=851,
)


Expand Down Expand Up @@ -320,8 +327,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=882,
serialized_end=948,
serialized_start=919,
serialized_end=985,
)

_METRICSDATA_METRICSENTRY = _descriptor.Descriptor(
Expand Down Expand Up @@ -357,8 +364,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=950,
serialized_end=1027,
serialized_start=987,
serialized_end=1064,
)

_METRICSDATA = _descriptor.Descriptor(
Expand Down Expand Up @@ -387,8 +394,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=817,
serialized_end=1027,
serialized_start=854,
serialized_end=1064,
)


Expand Down Expand Up @@ -418,15 +425,16 @@
extension_ranges=[],
oneofs=[
],
serialized_start=1029,
serialized_end=1065,
serialized_start=1066,
serialized_end=1102,
)

_FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
_FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY.containing_type = _FUNCTIONSTATUS
_FUNCTIONSTATUS.fields_by_name['latestUserExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
_FUNCTIONSTATUS.fields_by_name['latestSystemExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
_FUNCTIONSTATUS.fields_by_name['deserializationExceptions'].message_type = _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY
_FUNCTIONSTATUS.fields_by_name['metrics'].message_type = _METRICSDATA
_FUNCTIONSTATUSLIST.fields_by_name['functionStatusList'].message_type = _FUNCTIONSTATUS
_METRICSDATA_DATADIGEST.containing_type = _METRICSDATA
_METRICSDATA_METRICSENTRY.fields_by_name['value'].message_type = _METRICSDATA_DATADIGEST
Expand Down Expand Up @@ -512,8 +520,8 @@
file=DESCRIPTOR,
index=0,
options=None,
serialized_start=1068,
serialized_end=1416,
serialized_start=1105,
serialized_end=1453,
methods=[
_descriptor.MethodDescriptor(
name='GetFunctionStatus',
Expand Down
Expand Up @@ -346,6 +346,7 @@ def get_function_status(self):
status.serializationExceptions = self.total_stats.nserialization_exceptions
status.averageLatency = self.total_stats.compute_latency()
status.lastInvocationTime = self.total_stats.lastinvocationtime
status.metrics.CopyFrom(self.get_metrics())
return status

def join(self):
Expand Down
Expand Up @@ -49,6 +49,7 @@ message FunctionStatus {
// expressed in ms since epoch
int64 lastInvocationTime = 13;
string instanceId = 14;
MetricsData metrics = 15;
}

message FunctionStatusList {
Expand Down
Expand Up @@ -98,15 +98,24 @@ public void stop() {

@Override
public CompletableFuture<FunctionStatus> getFunctionStatus() {
CompletableFuture<FunctionStatus> statsFuture = new CompletableFuture<>();
if (!isAlive()) {
FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
functionStatusBuilder.setFailureException(getDeathException().getMessage());
return CompletableFuture.completedFuture(functionStatusBuilder.build());
statsFuture.complete(functionStatusBuilder.build());
return statsFuture;
}
FunctionStatus.Builder functionStatusBuilder = javaInstanceRunnable.getFunctionStatus();
functionStatusBuilder.setRunning(true);
return CompletableFuture.completedFuture(functionStatusBuilder.build());
getMetrics().handle((metrics, e) -> {
if (e == null) {
functionStatusBuilder.setMetrics(metrics);
}
statsFuture.complete(functionStatusBuilder.build());
return null;
});
return statsFuture;
}

@Override
Expand Down

0 comments on commit 5f779b4

Please sign in to comment.