-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-4775] Clean up metric protos; support integer distributions, gauges #7876
Conversation
ccf32fc
to
7366084
Compare
Run Java PreCommit |
double sum = 2; | ||
double min = 3; | ||
double max = 4; | ||
message IntGaugeData { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a use case for this at the moment? Are you implementing these right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, in #7823 I support all the types here (counters, distributions, gauges) everywhere in Java; I think Python already supports them as well, but need to dig in to that further.
@@ -29,7 +30,7 @@ | |||
/** | |||
* Create a metric that can be incremented and decremented, and is aggregated by taking the sum. | |||
*/ | |||
public static Counter counter(MonitoringInfoMetricName metricName) { | |||
public static Counter counter(MetricName metricName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep it as MonitoringInfoMetricName, since this is meant to be an implementation only used by SDK and RunnerHarness authors. For creating system style metrics with URN+labels,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick explanation for why this changed:
Before this PR, MetricName.named
allowed creating (user-)metrics from {namespace,name}, while MonitoringInfoMetricName.named
created metrics from {URN, labels}.
The latter can create user- or system-metrics, but naively creating MonitoringInfoMetricName
s for user-metrics is dangerous: they don't hash/equal the "same" metric created via MetricName.named
, which I ran into in tests.
Meanwhile, user code wants to make metrics from arbitrary MonitoringInfos, so a single API for that makes sense, but it should return a MetricName
(AutoValue) for user-URNs, and a MonitoringInfoMetricName
otherwise; I've changed the MonitoringInfoMetricName
constructor to do that in this PR. That means that it returns a MetricName
, not a MonitoringInfoMetricName
.
Ultimately (#7823), I merge these two into something that looks like MonitoringInfoMetricName
but is called MetricName
. In the meantime, there's little cost to using MetricName
as the super-type of both.
So, re: your comment here, I could create an additional MonitoringInfoMetricName
constructor that's only to be used for system metric URNs, and have ElementCountFnDataReceiver
call that here instead, and then pass the MonitoringInfoMetricName
it got to LabeledMetrics.Counter
here, but I don't think that is worth it.
That new constructor would feel unsafe if it didn't validate that the passed URN was in fact a system-metric, and that would start to be a lot of overhead, when LabeledMetrics.Counter
(and other callers) don't actually care whether they are dealing with a system- or user-metric.
So that's the whole story 😄 ! lmk what you think.
oneof distribution { | ||
IntDistributionData int_distribution_data = 1; | ||
DoubleDistributionData double_distribution_data = 2; | ||
int64 counter = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was long debate when, and the existing protos were what we could agree on when we introduced this design. While I am fine with this change you are proposing personally. I'd like you to get buy in on the dev list.
https://s.apache.org/beam-fn-api-metrics
https://lists.apache.org/list.html?dev@beam.apache.org:2018-2
I think most of it was in the comments of the doc. So unfortunately, I cannot point you toward too much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have strong opinion about removing CounterData, but I'm concerned that we make counter int64. In my opinion, double will be much more flexible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the info and pointers!
I was just going off of what seemed to be already supported in the Java and Python SDKs.
In particular, existing Java metrics are all in terms of long
, and I thought I discussed keeping that and dropping these other types with someone (@robertwb?) in the course of planning this work.
I'll read those links and we can discuss further.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't remember discussing this, it must have been with someone else.
I agree that supporting double counters (and especially distributions) would be useful. Also, we haven't really nailed down what the global semantics are for gauge metrics.
My position was, and remains, that reifying the exact set of supported metric types via a set of nested set of protos is not a stable, long-term solution. It took a long time to come up with this list, and here we are 6 months later changing it. (And change will become harder the more we actually use these protos everywhere.) Instead, data should be either (1) a bytes field or (2) something like
message data {
repeated int64 int_data = 1;
repeated double double_data = 2;
repeated string string_data = 3;
...?
}
with the type specifying its encoding into this data attribute. We can then support everything from simple counters to complex distributions without changing the protos, only introducing new types, where the set of types are defined by URNs, and runners simply pass through types they do not understand. This is consistent with the rest of the way the FnAPI works as well (e.g. coders, transforms, ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Robert. It seems we're stuck between 3 data models:
- existing protos
- {int,double,string} counters
- {int,double} x {distribution,extrema}
- unused in SDKs
- deprecated protos
- opaque bytes-payloads
- Robert's suggestion above
- not implemented anywhere afaik
In this PR, I changed (1)'s protos to match the SDK support that is modeled on (2), which is probably wrong!
(3) sgtm but seems like it is not the existing consensus / spec.
I think the larger goal (sending metrics over the job API) requires converting between SDK and proto metrics structures (in both directions), so they need to match.
Maybe there is a way I can side-step that, or maybe we can just get to the real long-term solution here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@robertwb I'd like to push back a bit about a significant change, where we use opaque bytes payloads.
I am okay with removing the layers in this proto and using the design Ryan has proposed here. But not an extensive rewrite at this stage of the game, which will slow down progress significantly.
We have protos for the MonitoringInfoTable. This solution is the extensible format, where you can store basically anything and the producer and consumer of this proto can be the only ones who need to understand it (and it could be passed through the RunnerHarness)
message MonitoringTableData { |
But we chose to have separate protos for the common well known metric formats (Metrics being a well defined concept shared by many systems, i.e. a timeseries of data): counter, gauge, distribution, etc.
Additionally, if you wish to modify these formats significantly at a future stage, you can always
- add to these protos without deleting
- Change the URN version number of a metric
- Upgrade all SDKs to package the metric with a format for the previous and new version number of the URN
- Upgrade all Runners that wish to support it.
Upgrading the URN version is the safe way to do this without introducing a breaking change, as consumers of MonitoringInfos can freely chose which URNs+versions they support.
builder.append(this.urn.toString()); | ||
builder.append(" "); | ||
builder.append(this.labels.toString()); | ||
if (labels.containsKey(PCOLLECTION_LABEL)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just iterate over the keyas and values and add them to the return string, instead of inspecting the labels for specific keys.
Then there is no need to update this code for new labels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote things that way in several places initially, but ran into interesting problems that we'll need to discuss eventually, so I'll describe them below.
Here's what happens in this PR if I naively append the label kv-pairs: ryan-williams@5576286.
Note that Flink receives a metric with the unseemly name step.PTRANSFORM.step.PCOLLECTION.pcoll.beam.metric.element_count.v1
(which goes e.g. to the Flink web UI, which we've taken pains to make more human-readable).
The first step
there is added by MetricKey.toString
, which calls this MetricName.toString
routine, expecting info about the metric's "name" (URN) only.
MetricKey vs MetricName
The issue, In my view, is that MonitoringInfoMetricName
is actually more of a MetricKey
implementation:
MetricKey
≈MonitoringInfoMetricName
({URN, labels})stepName
≈ [labels map with just aPTRANSFORM
entry]MetricName
≈ URN
MetricResult
≈MonitoringInfo
({key, metric value})
I ultimately lay things out in this manner in #7823, folding MIMN into MetricKey.
Special-casing single-label ptransform/pcollection-scoped metrics
A bigger issue is that I think runners and endpoints are going to want to hard-code [the only 2 cases that we will have for the foreseeable future]: <ptransform>.<namespace>.<name>
and <pcollection>.<urn>
(<ptransform>.<urn>
folds into this nicely as well).
They'll do this for backwards-compat reasons, and/or to make things look nice in e.g. web UIs. I've touched code like this in at least the Samza, Flink, and Spark runners (and that MetricsHttpSink
we were looking at).
I think it's great that the wire format is extensible to many+new labels, but ended up feeling that the SDK should provide the APIs that deal specifically with the shapes all calling code will care about atm.
Sorry that's a lot! I don't care much what we do in this case, but I think we'll have some harder similar choices shortly 😲 lmk what you think.
/** | ||
* Update this container with metrics from the passed {@link MonitoringInfo}s, and send updates | ||
* along to Flink's internal metrics framework. | ||
*/ | ||
public void updateMetrics(String stepName, List<MonitoringInfo> monitoringInfos) { | ||
MetricsContainer metricsContainer = getMetricsContainer(stepName); | ||
public void updateMetrics(List<MonitoringInfo> monitoringInfos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question about the overall design here. Much of this class doesn't seem flink specific. If its just aggregating the values in memory, perhaps it should just aggregate the MonitoringInfos. Then have some method to extract them in a flink specific format (possibly in another class)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting point. To paraphrase: ingesting MonitoringInfos into a MetricContainerStepMap (which other runners also use) will be a common path, and code for that should get factored out.
I experimented with keeping fn-api metrics natively in MonitoringInfos in MCSM, alongside traditional Java-SDK-formatted ones, but ended up needing to do a lot of other cleanup in there (#7890) before I could see straight 😄.
ExtremaData extremaData = metric.getExtremaData(); | ||
LOG.warn("Extrema metric unsupported: {}", extremaData); | ||
} | ||
if (!monitoringInfo.hasMetric()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whitelisting the URNs you support would be the preferred method of doing this.
What you've done here will allow various SDK system metrics which get added later on, without the devs of this runner getting any sort of heads up. The intention of the design is to allow the runner to whitelist the specific URNs it wishes to support. You may just wish to support the user metric prefixes. Doing that would drop the element count, execution time, etc. metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. It seems in this case like it's easier and better to just handle all the metrics that come through conforming to the standard format? Or maybe I'm not understanding what you mean.
@Ardagan You might want to take a look at this as well. To see the proto changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
phew, there's a lot here, thanks for the comments
double sum = 2; | ||
double min = 3; | ||
double max = 4; | ||
message IntGaugeData { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, in #7823 I support all the types here (counters, distributions, gauges) everywhere in Java; I think Python already supports them as well, but need to dig in to that further.
@@ -29,7 +30,7 @@ | |||
/** | |||
* Create a metric that can be incremented and decremented, and is aggregated by taking the sum. | |||
*/ | |||
public static Counter counter(MonitoringInfoMetricName metricName) { | |||
public static Counter counter(MetricName metricName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick explanation for why this changed:
Before this PR, MetricName.named
allowed creating (user-)metrics from {namespace,name}, while MonitoringInfoMetricName.named
created metrics from {URN, labels}.
The latter can create user- or system-metrics, but naively creating MonitoringInfoMetricName
s for user-metrics is dangerous: they don't hash/equal the "same" metric created via MetricName.named
, which I ran into in tests.
Meanwhile, user code wants to make metrics from arbitrary MonitoringInfos, so a single API for that makes sense, but it should return a MetricName
(AutoValue) for user-URNs, and a MonitoringInfoMetricName
otherwise; I've changed the MonitoringInfoMetricName
constructor to do that in this PR. That means that it returns a MetricName
, not a MonitoringInfoMetricName
.
Ultimately (#7823), I merge these two into something that looks like MonitoringInfoMetricName
but is called MetricName
. In the meantime, there's little cost to using MetricName
as the super-type of both.
So, re: your comment here, I could create an additional MonitoringInfoMetricName
constructor that's only to be used for system metric URNs, and have ElementCountFnDataReceiver
call that here instead, and then pass the MonitoringInfoMetricName
it got to LabeledMetrics.Counter
here, but I don't think that is worth it.
That new constructor would feel unsafe if it didn't validate that the passed URN was in fact a system-metric, and that would start to be a lot of overhead, when LabeledMetrics.Counter
(and other callers) don't actually care whether they are dealing with a system- or user-metric.
So that's the whole story 😄 ! lmk what you think.
oneof distribution { | ||
IntDistributionData int_distribution_data = 1; | ||
DoubleDistributionData double_distribution_data = 2; | ||
int64 counter = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the info and pointers!
I was just going off of what seemed to be already supported in the Java and Python SDKs.
In particular, existing Java metrics are all in terms of long
, and I thought I discussed keeping that and dropping these other types with someone (@robertwb?) in the course of planning this work.
I'll read those links and we can discuss further.
builder.append(this.urn.toString()); | ||
builder.append(" "); | ||
builder.append(this.labels.toString()); | ||
if (labels.containsKey(PCOLLECTION_LABEL)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote things that way in several places initially, but ran into interesting problems that we'll need to discuss eventually, so I'll describe them below.
Here's what happens in this PR if I naively append the label kv-pairs: ryan-williams@5576286.
Note that Flink receives a metric with the unseemly name step.PTRANSFORM.step.PCOLLECTION.pcoll.beam.metric.element_count.v1
(which goes e.g. to the Flink web UI, which we've taken pains to make more human-readable).
The first step
there is added by MetricKey.toString
, which calls this MetricName.toString
routine, expecting info about the metric's "name" (URN) only.
MetricKey vs MetricName
The issue, In my view, is that MonitoringInfoMetricName
is actually more of a MetricKey
implementation:
MetricKey
≈MonitoringInfoMetricName
({URN, labels})stepName
≈ [labels map with just aPTRANSFORM
entry]MetricName
≈ URN
MetricResult
≈MonitoringInfo
({key, metric value})
I ultimately lay things out in this manner in #7823, folding MIMN into MetricKey.
Special-casing single-label ptransform/pcollection-scoped metrics
A bigger issue is that I think runners and endpoints are going to want to hard-code [the only 2 cases that we will have for the foreseeable future]: <ptransform>.<namespace>.<name>
and <pcollection>.<urn>
(<ptransform>.<urn>
folds into this nicely as well).
They'll do this for backwards-compat reasons, and/or to make things look nice in e.g. web UIs. I've touched code like this in at least the Samza, Flink, and Spark runners (and that MetricsHttpSink
we were looking at).
I think it's great that the wire format is extensible to many+new labels, but ended up feeling that the SDK should provide the APIs that deal specifically with the shapes all calling code will care about atm.
Sorry that's a lot! I don't care much what we do in this case, but I think we'll have some harder similar choices shortly 😲 lmk what you think.
/** | ||
* Update this container with metrics from the passed {@link MonitoringInfo}s, and send updates | ||
* along to Flink's internal metrics framework. | ||
*/ | ||
public void updateMetrics(String stepName, List<MonitoringInfo> monitoringInfos) { | ||
MetricsContainer metricsContainer = getMetricsContainer(stepName); | ||
public void updateMetrics(List<MonitoringInfo> monitoringInfos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting point. To paraphrase: ingesting MonitoringInfos into a MetricContainerStepMap (which other runners also use) will be a common path, and code for that should get factored out.
I experimented with keeping fn-api metrics natively in MonitoringInfos in MCSM, alongside traditional Java-SDK-formatted ones, but ended up needing to do a lot of other cleanup in there (#7890) before I could see straight 😄.
ExtremaData extremaData = metric.getExtremaData(); | ||
LOG.warn("Extrema metric unsupported: {}", extremaData); | ||
} | ||
if (!monitoringInfo.hasMetric()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. It seems in this case like it's easier and better to just handle all the metrics that come through conforming to the standard format? Or maybe I'm not understanding what you mean.
double max = 4; | ||
message IntGaugeData { | ||
int64 value = 1; | ||
int64 timestamp_ms = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep this on the top level of the MonitoringInfo
DoubleDistributionData double_distribution_data = 2; | ||
int64 counter = 1; | ||
IntDistributionData distribution = 2; | ||
IntGaugeData gauge = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The one thing worth pointing out here, is that we never introduced this before because this only holds an int64, which is the same as a counter, which holds an int64 (Note you don't need the timestamp inside IntGaugeData), as its already in the MonitoringInfo proto.
So we didn't include both as separate fields in the oneof. You would just specify which way it is aggregated using the MonitoringInfo type field.
Thanks for the feedback, these discussions were very useful for me! As I mentioned here, these proto changes are basically backwards from the direction they are supposed to be evolving in. I'm going to close this out; I think I see how to move the other #7823-associated PRs forward orthogonally, and probably more simply for not needing to deal with new proto structures. |
(factored out of #7823)
Also starts tying together
MetricName
andMonitoringInfoMetricName
:MonitoringInfoMetricName.create
) and creates aMetricName
AutoValue
instance instead, which preserves existing behavior most straightforwardly.R: @robertwb, @ajamato
Post-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.