Skip to content

[BEAM-7972] Always use Global window in reshuffle and then apply wind…#9334

Merged
angoenka merged 1 commit intoapache:masterfrom
angoenka:fix_reshuffle
Aug 31, 2019
Merged

[BEAM-7972] Always use Global window in reshuffle and then apply wind…#9334
angoenka merged 1 commit intoapache:masterfrom
angoenka:fix_reshuffle

Conversation

@angoenka
Copy link
Contributor

…ow again.

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@angoenka
Copy link
Contributor Author

R: @robertwb @y1chi @tweise

@angoenka angoenka changed the title [BEAM-7972] Always use Global window in reshuffle and then apply wind… NOT FOR REVIEW [BEAM-7972] Always use Global window in reshuffle and then apply wind… Aug 14, 2019
@angoenka angoenka changed the title NOT FOR REVIEW [BEAM-7972] Always use Global window in reshuffle and then apply wind… [BEAM-7972] Always use Global window in reshuffle and then apply wind… Aug 14, 2019
@angoenka
Copy link
Contributor Author

R: @lukecwik

@y1chi
Copy link
Contributor

y1chi commented Aug 19, 2019

LGTM.
thanks for looking into this.

@angoenka
Copy link
Contributor Author

Gentle reminder for the review.
@robertwb @lukecwik

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just needed to work around a Dataflow JRH bug, right?

key, value = element
return key, TimestampedValue(value, timestamp)
# Transport the window as part of the value and restore it later.
return key, TimestampedValue((value, window), timestamp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to use a WindowedValue here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. I will make it windowed values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually while making the change i realized that the timestamp will be duplicated when using the windowed values without any benefit. Hence dropping the use of windowed value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would the timestamp be duplicated? The (window, timestamp, value) tuple seems best represented by a WindowedValue. (I would be OK with a plain old 3-tuple as well, but only going half way seems odd).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The defaulting to global window should be deleted since the Python SDK now does send a proper windowing strategy (same as Go SDK). The code was added as a migration path to allow for differences in where the Python/Go/Java SDKs were when submitting jobs to Dataflow.

So we should update the reshuffle code to not pass the non standard window from python.

We shouldn't have to, but if the alternative is significant JRH refactoring, then this code should be OK and we can add a comment that we're working around bugs in the Dataflow JRH.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a quick look, JRH refactoring seems significant.
I would like to keep it simple for now and will continue with changes in Reshuffle transform with additional comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would the timestamp be duplicated? The (window, timestamp, value) tuple seems best represented by a WindowedValue. (I would be OK with a plain old 3-tuple as well, but only going half way seems odd).

Ohh, I see what you mean. Updated the code.

Copy link
Contributor Author

@angoenka angoenka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will be needed irrespective of the JRH bug as we don't want to introduce a new windowing function which should be interpreted by the runner.
Here is the code which defaults to globalWindow when its not able to deserialize the windowing strategy. https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowParDoFnFactory.java#L106

key, value = element
return key, TimestampedValue(value, timestamp)
# Transport the window as part of the value and restore it later.
return key, TimestampedValue((value, window), timestamp)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. I will make it windowed values.

@angoenka
Copy link
Contributor Author

Gentle reminder for the review.

@robertwb
Copy link
Contributor

OK, yuck, that JRH code is really bad (and possibly buggy). The windowing function need not be interpreted by the runner, it should just note it's non-merging and pass things through.

@angoenka
Copy link
Contributor Author

That piece of JRH seems to be very intricate and I would don't think it has an easy fix without a lot of refactoring.
Do you have any better solution in mind?

@lukecwik
Copy link
Member

That piece of JRH seems to be very intricate and I would don't think it has an easy fix without a lot of refactoring.
Do you have any better solution in mind?

The defaulting to global window should be deleted since the Python SDK now does send a proper windowing strategy (same as Go SDK). The code was added as a migration path to allow for differences in where the Python/Go/Java SDKs were when submitting jobs to Dataflow.

@angoenka
Copy link
Contributor Author

The defaulting to global window should be deleted since the Python SDK now does send a proper windowing strategy (same as Go SDK). The code was added as a migration path to allow for differences in where the Python/Go/Java SDKs were when submitting jobs to Dataflow.

So we should update the reshuffle code to not pass the non standard window from python.

@angoenka
Copy link
Contributor Author

Request for another pass for the review.

return [
windowed_value.WindowedValue(
(key, value.value), value.timestamp, [window])
(key, value.value), value.timestamp, value.windows)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could do

key, windowed_values = element
return [wv.with_value((key, wv.value)) for wv in windowed_values]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

ungrouped = pcoll | Map(reify_timestamps)

# TODO(BEAM-8104) Using global window as one of the standard window.
# This is to mitigate the Java Runner Harness limitation to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Java Runner Harness/Dataflow Java Runner Harness/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@robertwb
Copy link
Contributor

robertwb commented Aug 30, 2019 via email

@angoenka angoenka merged commit d8c1146 into apache:master Aug 31, 2019
@angoenka
Copy link
Contributor Author

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants