New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-4374] Implementing a subset of the new metrics framework in python. #6205
Conversation
from apache_beam.metrics.cells import GaugeData | ||
from google.protobuf import timestamp_pb2 | ||
|
||
import time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious about your overall thoughts on the use of monitoring_infos and metrics in comments and naming.
Note: A Metric is a specific type of MonitoringInfo.
See: https://s.apache.org/beam-fn-api-metrics
self._user_metrics = None | ||
|
||
def wait_until_finish(self, duration=None): | ||
return self._state | ||
|
||
def metrics(self): | ||
def metrics(self, use_monitoring_infos=False, user_metrics_only=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type of change, is this reasonable? Adding these extra options on pulling out ht metrrics. This is currently the only way we have to test metrics, and I added the user_metrics_only, to allow testing the system metrics.
Note: We can update this code to only use use_monitoring_infos and delete all the legacy metrics once we are happy with this. But I have made use of Metrics/MonitoringInfos optional for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a fine change, however I will prefer it to remove the legacy stuff and change to new metrics at the same time. Is there a reason to keep both for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call is meant to be for user metrics, not for all general metrics. I think I'd rather not change this call, because it's user-facing. Maybe we can provide the parameter in the constructor? For a test runner, I'm okay with having these parameters, but for a user-facing runner, I'd rather not.
That being said, this is not backwards incompatible, so it's strictly possible to add it.
Also, since RunnerResult.metrics
is meant for user metrics, I think we should think about having a different function to return all job metrics. Something like RunnerResult.monitoring_metrics
, or RunnerResult.counters
, RunnerResult.monitoring_infos
... or something like that.
And perhaps, file metrics
to return all sorts of metrics in Beam 3..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the few tests I have done, I have matched the existing functionality. So we could go ahead and delete the old code if you would like. I just thought of this as a precautionary thing to do before flipping it on.
I think its okay to rename to RunnerResult.monitoring_infos.
# TODO(ajamato): Implement the remaining types, i.e. Double types | ||
# Extrema types, etc. See: | ||
# https://s.apache.org/beam-fn-api-metrics | ||
SUM_INT64_TYPE = 'beam:metrics:SumInt64' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we stick with the existing case of snake_case instead of CamelCase for all URNs?
Also, Eugene was able to move the URNs into proto form (#4672) to share across languages. Can we adopt the same now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
1f62c9b
to
2e4330f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks generally fine. I did not do an in-depth review.
@pabloem any comments?
self._user_metrics = None | ||
|
||
def wait_until_finish(self, duration=None): | ||
return self._state | ||
|
||
def metrics(self): | ||
def metrics(self, use_monitoring_infos=False, user_metrics_only=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a fine change, however I will prefer it to remove the legacy stuff and change to new metrics at the same time. Is there a reason to keep both for now?
I'm doing a review. I hope to finish by today. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a few questions and comments. It looks quite good to me in terms of implementation.
The only thing I think we need to reconsider is the use of the metrics
call to return also non-user metrics. This call is used by all SDKs, so I'm not on board for changing its behavior. Let's look into adding a different call with all job info.
@@ -161,6 +161,14 @@ def get_cumulative(self): | |||
with self._lock: | |||
return self.value | |||
|
|||
def to_runner_api_monitoring_info(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is being added to thr CounterCell
class for counters, but added to DistributionData
and GaugeData
instead of DistributionCell
/GaugeCell
. Perhaps we want to be consistent here (adding a CounterData
class? or making the Cell classes be the main data-holding unit? WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a CounterData class would make more sense but I would say this is out of scope of this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also added a way to get the non user metrics from the MetricResult and I am using that in a test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a TODO and a small explanation of this?
all_user_metrics = [] | ||
for k, v in self.counters.items(): | ||
all_user_metrics.append(int64_counter( | ||
user_metric_urn(k.namespace, k.name), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, the kind of aggregation is not part of the urn of a counter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alex has explained this to me offfline btw
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type_urn defines the type of aggregation
def to_runner_api_monitoring_info(self): | ||
"""Returns a Metric with this value for use in a MonitoringInfo.""" | ||
return beam_fn_api_pb2.Metric( | ||
counter_data=beam_fn_api_pb2.CounterData( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MEtrics API does have a GaugeData proto. Should we use that here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because as a monitoring_info it should be encoded as a counter. Since they use the same data format, with a different aggregation method. (gauges are just latest counters). Described in the proposal doc
|
||
def is_counter(monitoring_info_proto): | ||
"""Returns true if the monitoring info is a coutner metric.""" | ||
return monitoring_info_proto.type in [SUM_INT64_TYPE] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perf optimization: Maybe make the list a constant / make it a set? (also other places in this file)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if use_monitoring_infos: | ||
self._init_metrics_from_monitoring_infos(step_monitoring_infos) | ||
else: | ||
self._init_metrics_from_legacy_metrics(step_metrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, we can't just get rid of the if use_monitoring_infos == False
path because these are used by the Dataflow worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get rid of it. Its only there as a precaution before flipping it on. The monitoring_infos match the functionality of the legacy metrics, which one of my tests is looking for.
self._user_metrics = None | ||
|
||
def wait_until_finish(self, duration=None): | ||
return self._state | ||
|
||
def metrics(self): | ||
def metrics(self, use_monitoring_infos=False, user_metrics_only=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call is meant to be for user metrics, not for all general metrics. I think I'd rather not change this call, because it's user-facing. Maybe we can provide the parameter in the constructor? For a test runner, I'm okay with having these parameters, but for a user-facing runner, I'd rather not.
That being said, this is not backwards incompatible, so it's strictly possible to add it.
Also, since RunnerResult.metrics
is meant for user metrics, I think we should think about having a different function to return all job metrics. Something like RunnerResult.monitoring_metrics
, or RunnerResult.counters
, RunnerResult.monitoring_infos
... or something like that.
And perhaps, file metrics
to return all sorts of metrics in Beam 3..
"""Returns the list of MonitoringInfos collected by this operation.""" | ||
return (self.execution_time_metrics(transform_id) + | ||
self.element_count_metrics(transform_id) + | ||
self.user_metrics(transform_id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perf optimization: Maybe return generators instead of creating lists here, but you can iterate on that later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made the FnApiResults have a metrics() and monitoring_metrics() as you have suggested. Please see updated FnApiMetrics as well
labels: | ||
- PTRANSFORM | ||
type: beam:metrics:SumInt64 | ||
urn: beam:metric:element_count:v1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add examples of other metrics? e.g. distribution / gauge types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is for system defined ones, which we don't seem to have. As they are only used in use counters. Unless I am missing something. Happy to add one if we have an example in another SDK perhaps.
Right now this is just a catalog, its not actually loaded or used anywhere.
The doc covers how to add more.
@@ -223,6 +223,7 @@ def register(self, request, instruction_id): | |||
instruction_id=instruction_id, | |||
register=beam_fn_api_pb2.RegisterResponse()) | |||
|
|||
# Are these methods dead code? No. invoked via do_instruction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods are called here
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker.py#L214
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rmi-ng. That was just meant to be a note for me.
|
||
# Are these methods dead code? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
2e4330f
to
cf5d89d
Compare
Ready for another round of reviews. I'll investigate the presubmit in the meantime. Thank you all for the reviews. I have addressed all the comments. For some reason I can't reply to some of the threads in the UI though, but I have addressed everything I saw. Please let me know if I missed something |
retest this please |
Run Python PreCommit |
Lint issue is breaking python precommits ^^ |
@ajamato What are the deprecated metrics you mention in PR description? I do not see them in the design doc. |
@ajamato A more general question for my knowledge: as the communication between runner harness and SDK harness is done on a bundle basis. When the runner harness sends data to the sdk harness to execute a transform that contains metrics, does it:
My bet is option 2. But can you confirm? |
8d26eb4
to
fea51e1
Compare
Option 2, the metric is calculated for the bundle. That is, we send a single metric update for the bundle. The design expects that the upstream systems will handle that aggregation. It may sum all the metrics up together, outside of the SDK. The runner should forward these metrics to a metric aggregation system across to aggregate the metrics to a final value across all the workers. |
I don't mean specific metrics are deprecated. but the technique for passing metrics across the FN API is deprecated. This PR replaces how the metrics were inserted into the ProcessBundleRespose |
i think the test failures that you see in py precommit should be solved by rebasing |
39cd5a9
to
d4a8d70
Compare
I have fixed up all the test issues in this PR now, its ready for more review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I do not have further comments on the implementation.
I wonder: If we merge this change, will it break Python streaming metrics on dataflow at Head? Since the Runner harness expects the old API? You can try running a streaming job that uses this change - and if not, what's the plan to get it all to work together?
@@ -161,6 +161,14 @@ def get_cumulative(self): | |||
with self._lock: | |||
return self.value | |||
|
|||
def to_runner_api_monitoring_info(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a TODO and a small explanation of this?
c464739
to
b0cc761
Compare
sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we definitely need to keep the legacy reporting in place until all downstream systems have migrated to use the new variation.
6af0942
to
a5ea26b
Compare
Squashed all the commits, @robertwb, happy to iterate more on your suggestions but we would like to submit this PR, and finish up this iteration |
https://s.apache.org/beam-fn-api-metrics The follow types have been implemented 'beam:metrics:SumInt64' 'beam:metrics:DistributionInt64' 'beam:metrics:LatestInt64' User counters and the existing Metrics returned on ProcessBundle[Progress]Response are now being returned as monitoring_infos in ProcessBundle[Progress]Response. Note: The deprecated metrics are still being returned in the ProcessBundle[Progress]Response. Addtionally, a parameter has been added to MetricResults allowing to return metrics via use_monitoring_infos. fn_api_runner_test.test_metrics has been updated to verifies that the queried metrics are equivalent, whether or not use_monitoring_infos has been specified. Additionally, MetricResults has been updated to also return non user metrics, by setting user_metrics_only=True. This has been using in a new test: fn_api_runner_test.test_non_user_metrics
a5ea26b
to
5c4a2e9
Compare
Okay, as this looks good, I'll go ahead and merge. |
https://s.apache.org/beam-fn-api-metrics
The follow types have been implemented
'beam:metrics:SumInt64'
'beam:metrics:DistributionInt64'
'beam:metrics:LatestInt64'
User counters and the existing Metrics returned on
ProcessBundle[Progress]Response are now being returned
as monitoring_infos in ProcessBundle[Progress]Response.
Note: The deprecated metrics are still being returned in
the ProcessBundle[Progress]Response.
Addtionally, a parameter has been added to MetricResults
allowing to return metrics via use_monitoring_infos.
fn_api_runner_test.test_metrics has been updated to
verifies that the queried metrics are equivalent, whether
or not use_monitoring_infos has been specified.
Additionally, MetricResults has been updated to also return
non user metrics, by setting user_metrics_only=True.
This has been using in a new test:
fn_api_runner_test.test_non_user_metrics
Please add a meaningful description for your change here
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.Post-Commit Tests Status (on master branch)