Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-4776] Add metrics support to Java PortableRunner #10105

Merged
merged 1 commit into from Nov 29, 2019

Conversation

mwalenia
Copy link
Member

This PR adds conversion of portable MonitoringInfos to MetricResults in Java's PortableRunner.

R: @lgajowy @mxm @angoenka @iemejia
Can you take a look, guys? Thanks!


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@mwalenia mwalenia force-pushed the BEAM-4776-metrics-portableRunner-java branch from 9f683e7 to 5d0dd59 Compare November 14, 2019 14:23
@mwalenia
Copy link
Member Author

Run Java PreCommit

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mwalenia. Looks good. Do we have any integration tests that we can enable for end-to-end testing this?

@mwalenia
Copy link
Member Author

@mxm I don't think so, we don't run portable e2e tests in Java yet

@mxm
Copy link
Contributor

mxm commented Nov 15, 2019

I think we do through the Portable ValidatesRunner tests, we might want to enable these:

excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'

@mwalenia
Copy link
Member Author

You're right, thanks for pointing it out. I'll delete the exclusions and run the ValidatesRunner.

@mwalenia
Copy link
Member Author

Run Java Flink PortableValidatesRunner Batch

@mwalenia
Copy link
Member Author

@mxm I'm going to fix these failures and get back to you. Thanks again for pointing out those tests :)

@mxm
Copy link
Contributor

mxm commented Nov 18, 2019

Sounds good. Thanks!

@mxm
Copy link
Contributor

mxm commented Nov 18, 2019

Run Java Flink PortableValidatesRunner Streaming

1 similar comment
@mwalenia
Copy link
Member Author

Run Java Flink PortableValidatesRunner Streaming

@mwalenia
Copy link
Member Author

Run Java Flink PortableValidatesRunner Batch

@mwalenia
Copy link
Member Author

@mxm I excluded tests regarding committed metrics, as they are not supported.
I also excluded gauge metric tests, since it seems that they aren't supported on portable Flink either - I checked in the accumulators and there was no trace of gauges. Do you know anything about it?
Thanks!

@mwalenia
Copy link
Member Author

Run Java PreCommit

1 similar comment
@mwalenia
Copy link
Member Author

Run Java PreCommit

@mwalenia
Copy link
Member Author

Run Java Flink PortableValidatesRunner Batch

@mwalenia
Copy link
Member Author

Run Java Flink PortableValidatesRunner Streaming

@mxm
Copy link
Contributor

mxm commented Nov 21, 2019

Gauges should be supported. I'm using them on a production system. Beam by default doesn't expose any gauges though, so you might have to add some manually.

@mwalenia
Copy link
Member Author

How would I go about that? I'm not sure how exposing the metrics is done. Can you point me in a right direction?

@mwalenia
Copy link
Member Author

@echauchot Hi, I've stumbled upon a MetricsPusherTest failure in this PR.
I know why it happens:

the runner reports more than just the user metric defined in the test. TestMetricSink returns just the first metric from the list to the test. Since there's no guarantee that it will be the user metric, the assert is likely to catch a wrong value and fail.

Do you think this is a good reason to make the test account for such a situation?

I hope you're the person to reach out to in this case - MetricsPusherTest seems to be your creation :)

@@ -139,12 +139,9 @@ def portableValidatesRunnerTask(String name, Boolean streaming) {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker for this PR but out of curiosity, do enabling these in the Portable Spark Runner pass? It would be a good idea to enable it to if so, or report the errors so they can be fixed if not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I can create a PR to check this, that's a topic worth investigating.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#10198 it's here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, now that I think of it, you probably wanted to check the impact of my changes on the Spark runner, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iemejia Enabling the tests on Portable Spark runner fails. I'd have to investigate further in order to pinpoint the areas that fail

@mwalenia
Copy link
Member Author

@mxm How can I go about manually adding gauges? Does that mean changing the FlinkRunner to publish gauge metrics?

@mwalenia
Copy link
Member Author

Run Java Spark PortableValidatesRunner Batch

@mxm
Copy link
Contributor

mxm commented Nov 25, 2019

Gauges are reported here:

private void updateGauge(Iterable<MetricResult<GaugeResult>> gauges) {

Also they are added to the accumulator here:
MetricResults metricResults = asAttemptedOnlyMetricResults(metricsAccumulator.getLocalValue());

I don't know why the tests are not passing, but we can also fix gauges in a follow-up.

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment about enhancement of MetricsPusher

private Iterable<MetricResult<DistributionResult>> distributions;
private Iterable<MetricResult<GaugeResult>> gauges;

private PortableMetrics(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mwalenia to answer your question, I'm indeed the correct person for MetricsPusher related questions.
Regarding MetricsPusher, the problem goes beyond the test itself. The whole MetricsPusher feature reports for now only user metrics (that is why the test sink that is tailored for it only reads user metrics). But the aim since the beginning of the architectural design (pull vs push essentially) was to allow in the future to support system metrics. Here is the design I did at the time: https://s.apache.org/runner_independent_metrics_extraction.
Long story short, the good thing to do IMHO is to enhance MetricsPusher to support system metrics as well and, of course, update the test/sink.

@mwalenia
Copy link
Member Author

@mxm you're right, it will be simpler to figure out gauges in another PR.


@Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
@Test
public void pushesSystemMetrics() throws InterruptedException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@echauchot I added a test that checks if system metrics are supported by MetricPusher. It seems that they work :)

I also fixed the TestMetricsSink to account for this fact.

@mwalenia
Copy link
Member Author

Run Java PreCommit

@mwalenia mwalenia force-pushed the BEAM-4776-metrics-portableRunner-java branch from e5c819f to 7517573 Compare November 28, 2019 11:43
@mwalenia
Copy link
Member Author

@mxm It seems that there is no support for gauges in portability - I didn't find a proper MonitoringInfo type in metrics.proto.

@mwalenia
Copy link
Member Author

Run Java PreCommit

@mwalenia
Copy link
Member Author

@mxm the tests are green :) I think we need to take the gauge issue elsewhere, as it seems the gauges aren't portable at all.
If everything looks good to you, let me know, I'll clean up the commits and get them ready for merging.

@mxm
Copy link
Contributor

mxm commented Nov 28, 2019

Gauges are portable. The type is beam:metrics:latest_int_64. We can take care of the gauge tests separately of this PR.

@mxm
Copy link
Contributor

mxm commented Nov 28, 2019

Could you squash the commits?

@iemejia
Copy link
Member

iemejia commented Nov 28, 2019

For Portable Spark runner the issue tracking passing metrics from SDK harness to Spark is this https://issues.apache.org/jira/browse/BEAM-7219
Great this is done now at the Portable Runner side, this will allow Nexmark to be run too, great work @mwalenia !

@mwalenia mwalenia force-pushed the BEAM-4776-metrics-portableRunner-java branch from 7517573 to 4575e1c Compare November 29, 2019 09:05
@mwalenia
Copy link
Member Author

@mxm the commits are squashed.
As for gauges, my bad - I meant user gauge metrics.
Thanks!

@mxm mxm merged commit 1f64ba3 into apache:master Nov 29, 2019
@kennknowles
Copy link
Member

This has broken the Flink runner, it seems: https://issues.apache.org/jira/browse/BEAM-8869

It is also failing in some of Google's internal testing. I am still investigating that but will try to summarize and repro externally.

pipeline.run();
// give metrics pusher time to push
Thread.sleep(
(pipeline.getOptions().as(MetricsOptions.class).getMetricsPushPeriod() + 1L) * 1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably lower this interval and build in a retry logic. Otherwise this is prone to breaking.

@mwalenia mwalenia deleted the BEAM-4776-metrics-portableRunner-java branch January 24, 2020 10:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants