Skip to content

Conversation

@robertwb
Copy link
Contributor

@robertwb robertwb commented May 9, 2020


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@robertwb
Copy link
Contributor Author

robertwb commented May 9, 2020

R: @lukecwik

keep_of_element_remainder = keep / (1 - current_element_progress)
# If it's less than what's left of the current element,
# try splitting at the current element.
if (keep_of_element_remainder < 1 and is_valid_split_point(index) and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allowed_split_points definition is too vague in the case of multiple active elements and we need to scope it down to mean the set of allowed first_residual_element indices.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I've added clarification to the proto.

# pylint: disable=round-builtin
stop_index = index + max(1, int(round(current_element_progress + keep)))
if allowed_split_points and stop_index not in allowed_split_points:
allowed_split_points = sorted(allowed_split_points)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make it an error to have duplicate split points in allowed_split_points.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that there's never a reason to have duplicates, but that wouldn't impact this code here (and I don't think should result in failure).

else:
prev = allowed_split_points[closest - 1]
next = allowed_split_points[closest]
if index < prev and stop_index - prev < next - stop_index:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should comment that your choosing the closer of the two points here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

self.assertEqual(self.split(2, 0, 0.5, 16), simple_split(9))
self.assertEqual(self.split(6, 0, 0.5, 16), simple_split(11))

def test_split_with_element_progres(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_split_with_element_progres(self):
def test_split_with_element_progress(self):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

self.assertEqual(
self.split(0, 0, 0.25, 16, allowed=(2, 3, 6)), simple_split(3))

self.assertEqual(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to either add comments or break out the tests to separate methods to describe the different scenarios such as round to closest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept them in the same methods, because it's easier to understand the values relative to the prior examples.

Comment on lines 98 to 99
self.assertEqual(
self.sdf_split(0, 0, 0.12, 4), (-1, 'Primary(0.5)', 'Residual(0.5)', 1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.assertEqual(
self.sdf_split(0, 0, 0.12, 4), (-1, 'Primary(0.5)', 'Residual(0.5)', 1))
self.assertEqual(
self.sdf_split(0, 0, 0.125, 4), (-1, 'Primary(0.5)', 'Residual(0.5)', 1))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this originally, but it bumped the formatting, and .12 was close enough. I can change this back.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it confusing that the rounding dropped the fractional part when I was running through the scenarios.

// empty, there are no constraints on where to split.
// Specifically, the first_residual_element of a split result must be an
// allowed split point, and the last_primary_element must immediately
// preceded an allowed split point.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// preceded an allowed split point.
// precede an allowed split point.

@lukecwik
Copy link
Member

CC: @lostluck @youngoli @boyuanzz We'll want to duplicate this logic within Java/Go for allowed split point selection.

@robertwb
Copy link
Contributor Author

Run Java PreCommit

@lostluck
Copy link
Contributor

Ack.

@youngoli
Copy link
Contributor

Just wanna confirm, the allowed_split_points dictate splits at element boundaries only right?

@lukecwik lukecwik merged commit cad0333 into apache:master May 11, 2020
@lukecwik
Copy link
Member

The allowed split points tell you what is the valid set of return values for first_residual_index and last_primary_index+1 are so it applies to splittable DoFns as in if the runner tells you it can't split at position Y and your currently processing Y, there is no point of forwarding the split call to the SDF from the gRPC read port.

@lukecwik
Copy link
Member

@ibzib, can you cherry pick this into the 2.21 branch. This would make the associated BEAM-9935 a non-blocking issue for 2.21 afterwards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants