New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2855] nexmark python suite implement implement query 3, 4, 5, 6, 7, 8, 11 #12580
Conversation
sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_count.py
Outdated
Show resolved
Hide resolved
sdks/python/apache_beam/testing/benchmarks/nexmark/models/bids_per_session.py
Outdated
Show resolved
Hide resolved
sdks/python/apache_beam/testing/benchmarks/nexmark/models/bids_per_session.py
Outdated
Show resolved
Hide resolved
sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_count.py
Outdated
Show resolved
Hide resolved
sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query3.py
Outdated
Show resolved
Hide resolved
sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query3.py
Outdated
Show resolved
Hide resolved
sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query6.py
Outdated
Show resolved
Hide resolved
sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query6.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just some minor nits.
sdks/python/apache_beam/testing/benchmarks/nexmark/models/result_name.py
Outdated
Show resolved
Hide resolved
sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query3.py
Outdated
Show resolved
Hide resolved
sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query4.py
Outdated
Show resolved
Hide resolved
sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query3.py
Outdated
Show resolved
Hide resolved
Run Python PreCommit |
return ( | ||
sliding_bids | ||
| 'select_bids' >> beam.ParDo( | ||
SelectMaxBidFn(), beam.pvalue.AsSingleton(max_prices))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this how this is implemented in Java? I am wondering if we should make bids comparable. If they were comparable, then you would be able to just return max_prices
e.g.:
@functools.total_ordering
class ComparableBidByPrice(object):
def __init__(self, bid):
self.bid = bid
def __eq__(self, other):
return self.bid == other.bid
def __lt__(self, other):
return self.bid.price < other.bid.price
And then you'd do:
max_bids = (
sliding_bids
| beam.Map(ComparableBidByPrice)
| beam.CombineGlobally(max).without_defaults())
thoughts? The main thing here is having one fewer stage, thus higher performance - but I think the best option is to do whatever Java does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think one of the purpose would be to benchmark the performance of side inputs, thus some pipelines are choosing certain beam semantics that may not be the best way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's reasonable. Than can you just add a comment @leiyiz ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I read the code and the reason to not use combiner, which is more efficient, is to utilize the side-input functionality
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment on the file in a follow up PR please?
sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query6.py
Outdated
Show resolved
Hide resolved
| beam.WindowInto( | ||
window.GlobalWindows(), | ||
trigger=trigger.Repeatedly(trigger.AfterCount(1)), | ||
accumulation_mode=trigger.AccumulationMode.ACCUMULATING, | ||
allowed_lateness=0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This trigger is a little hard to wrap my head around : ) can you help me understand it? So I guess we simply accumulate fired panes and fire everything every time? (let's say that the stream contains a new element every second. Would we fire 1000 elements after 1000 seconds?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, so every time an event arrives, it should fire and calculate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because it is calculating the mean, every time something arrives it calculates the mean 1 more time
|
||
@on_timer(person_timer_spec) | ||
def expiry(self, person_state=beam.DoFn.StateParam(person_spec)): | ||
person_state.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it happen that the same person creates new auctions after > max_auction_waiting_time
? Will there be a new person event? If we get new auctions after the person is expired, then we'll just keep adding them to auction_state forevetr, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would, then the new auction for person is just added to the state and eventually dropped when the pipeline ends. but it is also specified in the nexmark spec as "clear the state after TTL" also the default timer is like 600 seconds long which is way longer than the test duration so I think it is less of a issue?
… implement implement query 3, 4, 5, 6, 7, 8, 11 * implement query 3, 4, 5, 6, 7, 8, 11 * forgot to run pylint2_3 * using dict instead of object for results of query * added to_type_hint for coders, fixed issues brought up in code review * reversed the sorting to not remove from front of list Co-authored-by: Leiyi Zhang <leiyiz@google.com>
… implement implement query 3, 4, 5, 6, 7, 8, 11 * implement query 3, 4, 5, 6, 7, 8, 11 * forgot to run pylint2_3 * using dict instead of object for results of query * added to_type_hint for coders, fixed issues brought up in code review * reversed the sorting to not remove from front of list Co-authored-by: Leiyi Zhang <leiyiz@google.com>
implemented querys
made a little change to nexmark Launcher to fix a bug where the error is not passed out correctly
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.