Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-6374] Emit PCollection metrics from GoSDK #10942

Merged
merged 4 commits into from
Mar 4, 2020

Conversation

lostluck
Copy link
Contributor

@lostluck lostluck commented Feb 23, 2020

This adds PCollection metrics to the Go SDK, in particular, Element Count, and Sampled Size.

New exec.PCollection nodes are added between every processing node in the bundle execution graph.

  • The new metrics are only added as MonitoringInfos, not the legacy protos.
  • There's about ~10ns added per element per PCollection node due to the atomic additions for every element.
  • Elements for sizes are selected randomly, then encoded to count their bytes (w/o window headers).
    • An initial index is selected form the first [0,1,2] at bundle start up, and then pre-select the next index from somewhere later on, proportional to the bundle so far.
    • As currently set up, it will take around 200-300 samples for the first 1M elements, so encoded overhead is limited
  • PCollections from a DataSource do 100% "sampling", since they're reading the bytes directly anyway. The PCollection node that would have been added after the DataSource is elided from the graph during construction, but re-used to avoid duplicating the logic for concurrently manipulating the size distribution.
    • DataSources can properly handle CoGBKs as well, counting non-header bytes for iterables, and state backed iterables.
    • This still involves a mutex Lock for every update, so we may want to find a lighter weight mechanism to handle the distribution samples from DataSources, or simply opt for the same random sampling.
    • A similar method could be used for DataSinks as well, but not handled in this PR.
    • It's important to note that the runner is already aware of the number of bytes sent and received from the SDK side, so we may opt to remove that this entirely.
  • Counts and Samples are yet not made for SideInputs, which would better account for data consumed by DoFns.

Thank you @ajamato for reminding me of the pre-select method for sampling, and @lukecwik for pointing out the DataSource can avoid separate additional encoding costs when measuring elements.

Performance impact:
I have two jobs I use for benchmarking this: Pipeline A uses int64s as elements and does simple passthroughs and sums, and Pipeline B where it's using large protocol buffers as elements, which spends a fair amount of CPU time decoding them.

For small "fast" elements, the overhead is about ~19.5% of the Go side processing (which makes sense if elements are just being passed around or incremented).
For large "heavy" elements, the overhead is about ~0.125% of the Go side of processing.

Specifically, this is only taking into account the Go SDK worker, and not any runner side costs. This feels acceptable for the time being, though it's possible we can improve this later, especially for "lighter" jobs.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@lostluck
Copy link
Contributor Author

R: @youngoli
cc: @ajamato @lukecwik

Copy link
Contributor

@youngoli youngoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I have a clarification question and a comment nit, but nothing worth blocking approval over.

sdks/go/pkg/beam/core/runtime/exec/plan.go Outdated Show resolved Hide resolved
@lostluck lostluck merged commit ded686a into apache:master Mar 4, 2020
lostluck added a commit that referenced this pull request Mar 6, 2020
lostluck added a commit that referenced this pull request Mar 6, 2020
lostluck added a commit that referenced this pull request Aug 12, 2021
Restoring #10942 to narrow down where the post submits failed previously. Expands should also not have a PCollection node after them,since the CoGBK coder is handled by the Datasource.

-------

This adds PCollection metrics to the Go SDK, in particular, Element Count, and Sampled Size.

New exec.PCollection nodes are added between every processing node in the bundle execution graph.
* The new metrics are only added as MonitoringInfos, not the legacy protos.
* There's about ~10ns added per element per PCollection node due to the atomic additions for every element.
* Elements for sizes are selected randomly, then encoded to count their bytes (w/o window headers).
  * The first three elements are always re-encoded and collected, and then a random pre-select method picks the next index from somewhere later on, proportional to the bundle so far.
  * As currently set up, it will take around 200-300 samples for the first 1M elements, so encoded overhead is limited
* PCollections from a DataSource do 100% "sampling", since they're reading the bytes directly anyway. The PCollection node that would have been added after the DataSource is elided from the graph during construction, but re-used to avoid duplicating the logic for concurrently manipulating the size distribution.
  * DataSources can properly handle CoGBKs as well, counting non-header bytes for iterables, and state backed iterables.
  * This still involves a mutex Lock for every update, so we may want to find a lighter weight mechanism to handle the distribution samples from DataSources, or simply opt for the same random sampling.
  * A similar method could be used for DataSinks as well, but not handled in this PR. 
  * It's important to note that the runner is already aware of the number of bytes sent and received from the SDK side, so we may opt to remove that  entirely.
* Counts and Samples are yet not made for SideInputs, which would better account for data consumed by DoFns.

Thank you @ajamato for reminding me of the pre-select method for sampling, and @lukecwik for pointing out the DataSource can avoid separate additional encoding costs when measuring elements.

Performance impact:
I have two jobs I use for benchmarking this: Pipeline A uses int64s as elements and does simple passthroughs and sums, and Pipeline B where it's using large protocol buffers as elements, which spends a fair amount of CPU time decoding them.

For small "fast" elements, the overhead is about ~19.5% of the Go side processing (which makes sense if elements are just being passed around or incremented).
For large "heavy" elements, the overhead is about ~0.125% of the Go side of processing.

Specifically, this is only taking into account the Go SDK worker, and not any runner side costs. This feels acceptable for the time being, though it's possible we can improve this later, especially for "lighter" jobs.
calvinleungyk pushed a commit to calvinleungyk/beam that referenced this pull request Sep 22, 2021
Restoring apache#10942 to narrow down where the post submits failed previously. Expands should also not have a PCollection node after them,since the CoGBK coder is handled by the Datasource.

-------

This adds PCollection metrics to the Go SDK, in particular, Element Count, and Sampled Size.

New exec.PCollection nodes are added between every processing node in the bundle execution graph.
* The new metrics are only added as MonitoringInfos, not the legacy protos.
* There's about ~10ns added per element per PCollection node due to the atomic additions for every element.
* Elements for sizes are selected randomly, then encoded to count their bytes (w/o window headers).
  * The first three elements are always re-encoded and collected, and then a random pre-select method picks the next index from somewhere later on, proportional to the bundle so far.
  * As currently set up, it will take around 200-300 samples for the first 1M elements, so encoded overhead is limited
* PCollections from a DataSource do 100% "sampling", since they're reading the bytes directly anyway. The PCollection node that would have been added after the DataSource is elided from the graph during construction, but re-used to avoid duplicating the logic for concurrently manipulating the size distribution.
  * DataSources can properly handle CoGBKs as well, counting non-header bytes for iterables, and state backed iterables.
  * This still involves a mutex Lock for every update, so we may want to find a lighter weight mechanism to handle the distribution samples from DataSources, or simply opt for the same random sampling.
  * A similar method could be used for DataSinks as well, but not handled in this PR. 
  * It's important to note that the runner is already aware of the number of bytes sent and received from the SDK side, so we may opt to remove that  entirely.
* Counts and Samples are yet not made for SideInputs, which would better account for data consumed by DoFns.

Thank you @ajamato for reminding me of the pre-select method for sampling, and @lukecwik for pointing out the DataSource can avoid separate additional encoding costs when measuring elements.

Performance impact:
I have two jobs I use for benchmarking this: Pipeline A uses int64s as elements and does simple passthroughs and sums, and Pipeline B where it's using large protocol buffers as elements, which spends a fair amount of CPU time decoding them.

For small "fast" elements, the overhead is about ~19.5% of the Go side processing (which makes sense if elements are just being passed around or incremented).
For large "heavy" elements, the overhead is about ~0.125% of the Go side of processing.

Specifically, this is only taking into account the Go SDK worker, and not any runner side costs. This feels acceptable for the time being, though it's possible we can improve this later, especially for "lighter" jobs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants