Skip to content

Commit

Permalink
Implementing a subset of the new metrics framework in python.
Browse files Browse the repository at this point in the history
https://s.apache.org/beam-fn-api-metrics

The follow types have been implemented
'beam:metrics:SumInt64'
'beam:metrics:DistributionInt64'
'beam:metrics:LatestInt64'
User counters and the existing Metrics returned on
ProcessBundle[Progress]Response are now being returned
as monitoring_infos in ProcessBundle[Progress]Response.

Note: The deprecated metrics are still being returned in
the ProcessBundle[Progress]Response.

Addtionally, a parameter has been added to MetricResults
allowing to return metrics via use_monitoring_infos.

fn_api_runner_test.test_metrics has been updated to
verifies that the queried metrics are equivalent, whether
or not use_monitoring_infos has been specified.

Additionally, MetricResults has been updated to also return
non user metrics, by setting user_metrics_only=True.
This has been using in a new test:
fn_api_runner_test.test_non_user_metrics
  • Loading branch information
Alex Amato committed Aug 13, 2018
1 parent b822633 commit dde4ee5
Show file tree
Hide file tree
Showing 13 changed files with 562 additions and 15 deletions.
21 changes: 17 additions & 4 deletions model/fn-execution/src/main/proto/beam_fn_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,16 @@ message ProcessBundleRequest {
message ProcessBundleResponse {
// (Optional) If metrics reporting is supported by the SDK, this represents
// the final metrics to record for this bundle.
// DEPRECATED
Metrics metrics = 1;

// (Optional) Specifies that the bundle has been split since the last
// ProcessBundleProgressResponse was sent.
BundleSplit split = 2;

// (Required) The list of metrics or other MonitoredState
// collected while processing this bundle.
repeated MonitoringInfo monitoring_infos = 3;
}

// A request to report progress information for a given bundle.
Expand All @@ -275,9 +280,9 @@ message MonitoringInfo {
// Sub types like field formats - int64, double, string.
// Aggregation methods - SUM, LATEST, TOP-N, BOTTOM-N, DISTRIBUTION
// valid values are:
// beam:metrics:[SumInt64|LatestInt64|Top-NInt64|Bottom-NInt64|
// SumDouble|LatestDouble|Top-NDouble|Bottom-NDouble|DistributionInt64|
// DistributionDouble|MonitoringDataTable]
// beam:metrics:[sum_int_64|latest_int_64|top_n_int_64|bottom_n_int_64|
// sum_double|latest_double|top_n_double|bottom_n_double|
// distribution_int_64|distribution_double|monitoring_data_table
string type = 2;

// The Metric or monitored state.
Expand All @@ -302,6 +307,10 @@ message MonitoringInfo {
// Some systems such as Stackdriver will be able to aggregate the metrics
// using a subset of the provided labels
map<string, string> labels = 5;

// The walltime of the most recent update.
// Useful for aggregation for Latest types such as LatestInt64.
google.protobuf.Timestamp timestamp = 6;
}

message Metric {
Expand Down Expand Up @@ -525,12 +534,16 @@ message Metrics {
}

message ProcessBundleProgressResponse {
// (Required)
// DEPRECATED (Required)
Metrics metrics = 1;

// (Optional) Specifies that the bundle has been split since the last
// ProcessBundleProgressResponse was sent.
BundleSplit split = 2;

// (Required) The list of metrics or other MonitoredState
// collected while processing this bundle.
repeated MonitoringInfo monitoring_infos = 3;
}

message ProcessBundleSplitRequest {
Expand Down
56 changes: 56 additions & 0 deletions model/fn-execution/src/main/proto/metric_definitions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
[
{
"annotations": {
"description": "The total estimated execution time of the ptransform",
"unit": "msecs"
},
"labels": [
"PTRANSFORM"
],
"type": "beam:metrics:SumInt64",
"urn": "beam:metric:ptransform_execution_time:total_msecs:v1"
},
{
"annotations": {
"description": "The total estimated execution time of the start bundle function in a pardo",
"unit": "msecs"
},
"labels": [
"PTRANSFORM"
],
"type": "beam:metrics:SumInt64",
"urn": "beam:metric:pardo_execution_time:start_bundle_msecs:v1"
},
{
"annotations": {
"description": "The total estimated execution time of the process bundle function in a pardo",
"unit": "msecs"
},
"labels": [
"PTRANSFORM"
],
"type": "beam:metrics:SumInt64",
"urn": "beam:metric:pardo_execution_time:process_bundle_msecs:v1"
},
{
"annotations": {
"description": "The total estimated execution time of the finish bundle function in a pardo",
"unit": "msecs"
},
"labels": [
"PTRANSFORM"
],
"type": "beam:metrics:SumInt64",
"urn": "beam:metric:pardo_execution_time:finish_bundle_msecs:v1"
},
{
"annotations": {
"description": "The total elements counted for a metric."
},
"labels": [
"PTRANSFORM"
],
"type": "beam:metrics:SumInt64",
"urn": "beam:metric:element_count:v1"
}
]
23 changes: 23 additions & 0 deletions sdks/python/apache_beam/metrics/cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ def get_cumulative(self):
with self._lock:
return self.value

def to_runner_api_monitoring_info(self):
"""Returns a Metric with this value for use in a MonitoringInfo."""
return beam_fn_api_pb2.Metric(
counter_data=beam_fn_api_pb2.CounterData(
int64_value=self.get_cumulative()
)
)


class DistributionCell(Distribution, MetricCell):
"""For internal use only; no backwards-compatibility guarantees.
Expand Down Expand Up @@ -375,6 +383,14 @@ def from_runner_api(proto):
float(proto.timestamp.nanos) / 10**9)
return GaugeData(proto.value, timestamp=gauge_timestamp)

def to_runner_api_monitoring_info(self):
"""Returns a Metric with this value for use in a MonitoringInfo."""
return beam_fn_api_pb2.Metric(
counter_data=beam_fn_api_pb2.CounterData(
int64_value=self.value
)
)


class DistributionData(object):
"""For internal use only; no backwards-compatibility guarantees.
Expand Down Expand Up @@ -440,6 +456,13 @@ def to_runner_api(self):
def from_runner_api(proto):
return DistributionData(proto.sum, proto.count, proto.min, proto.max)

def to_runner_api_monitoring_info(self):
"""Returns a Metric with this value for use in a MonitoringInfo."""
return beam_fn_api_pb2.Metric(
distribution_data=beam_fn_api_pb2.DistributionData(
int_distribution_data=beam_fn_api_pb2.IntDistributionData(
count=self.count, sum=self.sum, min=self.min, max=self.max)))


class MetricAggregator(object):
"""For internal use only; no backwards-compatibility guarantees.
Expand Down
29 changes: 29 additions & 0 deletions sdks/python/apache_beam/metrics/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
from apache_beam.metrics.cells import CounterCell
from apache_beam.metrics.cells import DistributionCell
from apache_beam.metrics.cells import GaugeCell
from apache_beam.metrics.monitoring_infos import int64_counter
from apache_beam.metrics.monitoring_infos import int64_distribution
from apache_beam.metrics.monitoring_infos import int64_gauge
from apache_beam.metrics.monitoring_infos import user_metric_urn
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.runners.worker import statesampler

Expand Down Expand Up @@ -211,6 +215,31 @@ def to_runner_api(self):
for k, v in self.gauges.items()]
)

def to_runner_api_monitoring_infos(self, transform_id):
"""Returns a list of MonitoringInfos for the metrics in this container."""
all_user_metrics = []
for k, v in self.counters.items():
all_user_metrics.append(int64_counter(
user_metric_urn(k.namespace, k.name),
v.to_runner_api_monitoring_info(),
ptransform=transform_id
))

for k, v in self.distributions.items():
all_user_metrics.append(int64_distribution(
user_metric_urn(k.namespace, k.name),
v.get_cumulative().to_runner_api_monitoring_info(),
ptransform=transform_id
))

for k, v in self.gauges.items():
all_user_metrics.append(int64_gauge(
user_metric_urn(k.namespace, k.name),
v.get_cumulative().to_runner_api_monitoring_info(),
ptransform=transform_id
))
return all_user_metrics


class MetricUpdates(object):
"""Contains updates for several metrics.
Expand Down

0 comments on commit dde4ee5

Please sign in to comment.