Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10854] Fix PeriodicImpulse for default values #14446

Merged
merged 3 commits into from Apr 8, 2021

Conversation

InigoSJ
Copy link
Contributor

@InigoSJ InigoSJ commented Apr 6, 2021

Fixing issue from BEAM-10854 and BEAM-11620, PeriodicImpulse would fail with default values.

Some notes:

  • Lines [40-44] from periodicsequence.py could go directly in PeriodicImpulse (after line 162). But this would only fix PeriodicImpulse and would fail if other users decide to use Timestamp instead of Float for the times.
  • lines [92-96] of periodicsequence_test.py, adding this line would work to check if the default values would work both for start and end. This would avoid new errors in case the transform is modified. Those lines could be removed if needed

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK ULR Dataflow Flink Samza Spark Twister2
Go --- --- Build Status --- Build Status ---
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@@ -66,6 +66,7 @@ def __init__(self, seconds=0, micros=0):
raise TypeError(
'Cannot interpret %s %s as micros.' % (micros, type(micros)))
self.micros = int(seconds * 1000000) + int(micros)
self.seconds = seconds + micros / 1000000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not believe we want to add another field to keep track of the same information. A few options:

  • In periodicsequence.py read micros and convert to seconds there.
  • hava an attribute on Timestamp here to return seconds from the internally stored micros.

(same for the Duration change below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I was considering using the @property and a _get_seconds(). What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have coded both options and I think the first approach looks better and more compact. I will push the version that changes to seconds directly in periodicsequence

@@ -37,6 +37,11 @@
class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
def initial_restriction(self, element):
start, end, interval = element
if isinstance(start, Timestamp):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should they be always Timestamps to avoid isinstance checks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that would also work, I just didn't want to take the option of using other types, since some users may already be using them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewing the code, our current tests use time.time() which return a float, I think we should not change it to only allow Timestamps. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Let's keep it as is.

@aaltay aaltay requested a review from pabloem April 6, 2021 17:31
@codecov
Copy link

codecov bot commented Apr 7, 2021

Codecov Report

Merging #14446 (08b34f0) into master (91382ad) will increase coverage by 0.08%.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #14446      +/-   ##
==========================================
+ Coverage   83.39%   83.47%   +0.08%     
==========================================
  Files         469      450      -19     
  Lines       58727    58812      +85     
==========================================
+ Hits        48977    49095     +118     
+ Misses       9750     9717      -33     
Impacted Files Coverage Δ
...ks/python/apache_beam/examples/cookbook/filters.py
...cp/internal/clients/storage/storage_v1_messages.py
...s/python/apache_beam/io/gcp/bigquery_avro_tools.py
...am/examples/snippets/transforms/aggregation/sum.py
..._beam/testing/benchmarks/nexmark/queries/query5.py
...build/srcs/sdks/python/apache_beam/testing/util.py
...gcp/datastore/v1new/datastore_write_it_pipeline.py
...sdks/python/apache_beam/runners/direct/executor.py
...amples/snippets/transforms/aggregation/__init__.py
...d/srcs/sdks/python/apache_beam/io/concat_source.py
... and 909 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 91382ad...08b34f0. Read the comment docs.

@@ -81,6 +86,9 @@ def process(
'''
start, _, interval = element

if isinstance(start, Timestamp):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to your change. But this code has some other issues.

current_output_timestamp is not a Timestamp and needs to be converted later.
mixed use of Timestamp and timestamp.Timestamp - we can pick either.

@aaltay
Copy link
Member

aaltay commented Apr 7, 2021

It LGTM. I will leave it to @pabloem to merge, in case there will be issues after the merge I won't be available.

@pabloem pabloem merged commit a696836 into apache:master Apr 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants