Skip to content

Commit

Permalink
[BEAM-8575] Test DoFn context params
Browse files Browse the repository at this point in the history
  • Loading branch information
liumomo315 committed Nov 15, 2019
1 parent 8c3af8a commit 5cccf66
Showing 1 changed file with 24 additions and 0 deletions.
24 changes: 24 additions & 0 deletions sdks/python/apache_beam/pipeline_test.py
Expand Up @@ -25,6 +25,8 @@
from builtins import object
from builtins import range

from nose.plugins.attrib import attr

import mock

import apache_beam as beam
Expand Down Expand Up @@ -641,6 +643,26 @@ def process(self, element, prefix, suffix=DoFn.SideInputParam):
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
pipeline.run()

@attr('ValidatesRunner')
def test_element_param(self):
pipeline = TestPipeline()
input = [1, 2]
pcoll = (pipeline
| 'Create' >> Create(input)
| 'Ele param' >> Map(lambda element=DoFn.ElementParam: element))
assert_that(pcoll, equal_to(input))
pipeline.run()

@attr('ValidatesRunner')
def test_key_param(self):
pipeline = TestPipeline()
pcoll = (pipeline
| 'Create' >> Create([('a', 1), ('b', 2)])
| 'Key param' >> Map(lambda _, key=DoFn.KeyParam: key))
assert_that(pcoll, equal_to(['a', 'b']))
pipeline.run()

@attr('ValidatesRunner')
def test_window_param(self):
class TestDoFn(DoFn):
def process(self, element, window=DoFn.WindowParam):
Expand All @@ -663,6 +685,7 @@ def process(self, element, window=DoFn.WindowParam):
label='doubled windows')
pipeline.run()

@attr('ValidatesRunner')
def test_timestamp_param(self):
class TestDoFn(DoFn):
def process(self, element, timestamp=DoFn.TimestampParam):
Expand All @@ -679,6 +702,7 @@ def test_timestamp_param_map(self):
p | Create([1, 2]) | beam.Map(lambda _, t=DoFn.TimestampParam: t),
equal_to([MIN_TIMESTAMP, MIN_TIMESTAMP]))

@attr('ValidatesRunner')
def test_pane_info_param(self):
with TestPipeline() as p:
pc = p | Create([(None, None)])
Expand Down

0 comments on commit 5cccf66

Please sign in to comment.