Skip to content

Commit

Permalink
Merge pull request #14446 from [BEAM-10854] Fix PeriodicImpulse for d…
Browse files Browse the repository at this point in the history
…efault values

* [BEAM-10854] Fix PeriodicImpulse

* [BEAM-10854] Fix PeriodicImpulse for default values

* [BEAM-10854] Fix PeriodicImpulse for default values
  • Loading branch information
InigoSJ committed Apr 8, 2021
1 parent dada0f9 commit a696836
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/transforms/periodicsequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
def initial_restriction(self, element):
start, end, interval = element
if isinstance(start, Timestamp):
start = start.micros / 1000000
if isinstance(end, Timestamp):
end = end.micros / 1000000

assert start <= end
assert interval > 0
total_outputs = math.ceil((end - start) / interval)
Expand Down Expand Up @@ -81,6 +86,9 @@ def process(
'''
start, _, interval = element

if isinstance(start, Timestamp):
start = start.micros / 1000000

assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)

current_output_index = restriction_tracker.current_restriction().start
Expand Down
20 changes: 20 additions & 0 deletions sdks/python/apache_beam/transforms/periodicsequence_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from __future__ import division
from __future__ import print_function

import inspect
import time
import unittest
from builtins import range
Expand Down Expand Up @@ -81,6 +82,25 @@ def test_periodicimpulse_windowing_on_si(self):
for x in range(0, int(duration / interval), 1)]
assert_that(actual, equal_to(k))

def test_periodicimpulse_default_start(self):
default_parameters = inspect.signature(PeriodicImpulse).parameters
it = default_parameters["start_timestamp"].default
duration = 1
et = it + duration
interval = 0.5

# Check default `stop_timestamp` is the same type `start_timestamp`
is_same_type = isinstance(
it, type(default_parameters["stop_timestamp"].default))
error = "'start_timestamp' and 'stop_timestamp' have different type"
assert is_same_type, error

with TestPipeline() as p:
result = p | 'PeriodicImpulse' >> PeriodicImpulse(it, et, interval)

k = [it + x * interval for x in range(0, int(duration / interval))]
assert_that(result, equal_to(k))

def test_periodicsequence_outputs_valid_sequence_in_past(self):
start_offset = -10000
it = time.time() + start_offset
Expand Down

0 comments on commit a696836

Please sign in to comment.