Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-662] Fix for allowing floating point periods in windows #2600

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 6 additions & 4 deletions sdks/python/apache_beam/transforms/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,13 +388,15 @@ def __init__(self, size, period, offset=0):
raise ValueError('The size parameter must be strictly positive.')
self.size = Duration.of(size)
self.period = Duration.of(period)
self.offset = Timestamp.of(offset) % size
self.offset = Timestamp.of(offset) % period

def assign(self, context):
timestamp = context.timestamp
start = timestamp - (timestamp - self.offset) % self.period
return [IntervalWindow(Timestamp.of(s), Timestamp.of(s) + self.size)
for s in range(start, start - self.size, -self.period)]
start = timestamp - ((timestamp - self.offset) % self.period)
return [
IntervalWindow(Timestamp(micros=s), Timestamp(micros=s) + self.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove the truncating Timestamp.int which allowed this bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, that does change the return type from seconds -> microseconds. Do you know if there is any other place that depends on it, I wasn't sure about that part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't change the return type, just remove it altogether. It just means one couldn't write int(timestamp) and get the (implicitly truncated) timestamps anymore, including using them in range.

I don't think anyone is using it (and if they are, likely there's a bug like above).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

for s in range(start.micros, timestamp.micros - self.size.micros,
-self.period.micros)]

def __eq__(self, other):
if type(self) == type(other) == SlidingWindows:
Expand Down
14 changes: 14 additions & 0 deletions sdks/python/apache_beam/transforms/window_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,20 @@ def test_sliding_windows_assignment(self):
self.assertEqual(expected, windowfn.assign(context('v', 8)))
self.assertEqual(expected, windowfn.assign(context('v', 11)))

def test_sliding_windows_assignment_fraction(self):
windowfn = SlidingWindows(size=3.5, period=2.5, offset=1.5)
self.assertEqual([IntervalWindow(1.5, 5.0), IntervalWindow(-1.0, 2.5)],
windowfn.assign(context('v', 1.7)))
self.assertEqual([IntervalWindow(1.5, 5.0)],
windowfn.assign(context('v', 3)))

def test_sliding_windows_assignment_fraction_large_offset(self):
windowfn = SlidingWindows(size=3.5, period=2.5, offset=4.0)
self.assertEqual([IntervalWindow(1.5, 5.0), IntervalWindow(-1.0, 2.5)],
windowfn.assign(context('v', 1.7)))
self.assertEqual([IntervalWindow(4.0, 7.5), IntervalWindow(1.5, 5.0)],
windowfn.assign(context('v', 4.5)))

def test_sessions_merging(self):
windowfn = Sessions(10)

Expand Down
4 changes: 0 additions & 4 deletions sdks/python/apache_beam/utils/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,6 @@ def __float__(self):
# Note that the returned value may have lost precision.
return float(self.micros) / 1000000

def __int__(self):
# Note that the returned value may have lost precision.
return self.micros / 1000000

def __cmp__(self, other):
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, Timestamp):
Expand Down