diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index b1b2e68173533..0e11a9a1b75f4 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -26,6 +26,7 @@ from builtins import range import mock +from nose.plugins.attrib import attr import apache_beam as beam from apache_beam import typehints @@ -641,6 +642,25 @@ def process(self, element, prefix, suffix=DoFn.SideInputParam): assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) pipeline.run() + @attr('ValidatesRunner') + def test_element_param(self): + pipeline = TestPipeline() + input = [1, 2] + pcoll = (pipeline + | 'Create' >> Create(input) + | 'Ele param' >> Map(lambda element=DoFn.ElementParam: element)) + assert_that(pcoll, equal_to(input)) + pipeline.run() + + @attr('ValidatesRunner') + def test_key_param(self): + pipeline = TestPipeline() + pcoll = (pipeline + | 'Create' >> Create([('a', 1), ('b', 2)]) + | 'Key param' >> Map(lambda _, key=DoFn.KeyParam: key)) + assert_that(pcoll, equal_to(['a', 'b'])) + pipeline.run() + def test_window_param(self): class TestDoFn(DoFn): def process(self, element, window=DoFn.WindowParam):