From 4e7f870d7bdeba3fda27f4e43c0aec8341d282ba Mon Sep 17 00:00:00 2001 From: venbijjam Date: Fri, 14 Oct 2016 13:27:38 -0500 Subject: [PATCH 01/39] BEAM-520 --- .../cookbook/multiple_output_pardo_test.py | 55 ++++++------------- 1 file changed, 18 insertions(+), 37 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 3ddd668599d0..10dc18a89f04 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -18,55 +18,36 @@ """Test for the multiple_output_pardo example.""" import logging -import re -import tempfile import unittest +import apache_beam as beam from apache_beam.examples.cookbook import multiple_output_pardo +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import DataflowAssertException -class MultipleOutputParDo(unittest.TestCase): +class MultipleOutputParDoTest(unittest.TestCase): SAMPLE_TEXT = 'A whole new world\nA new fantastic point of view' EXPECTED_SHORT_WORDS = [('A', 2), ('new', 2), ('of', 1)] EXPECTED_WORDS = [ ('whole', 1), ('world', 1), ('fantastic', 1), ('point', 1), ('view', 1)] - def create_temp_file(self, contents): - with tempfile.NamedTemporaryFile(delete=False) as f: - f.write(contents) - return f.name - - def get_wordcount_results(self, temp_path): - results = [] - with open(temp_path) as result_file: - for line in result_file: - match = re.search(r'([A-Za-z]+): ([0-9]+)', line) - if match is not None: - results.append((match.group(1), int(match.group(2)))) - return results - def test_multiple_output_pardo(self): - temp_path = self.create_temp_file(self.SAMPLE_TEXT) - result_prefix = temp_path + '.result' - - multiple_output_pardo.run([ - '--input=%s*' % temp_path, - '--output=%s' % result_prefix]) - - expected_char_count = len(''.join(self.SAMPLE_TEXT.split('\n'))) - with open(result_prefix + '-chars-00000-of-00001') as f: - contents = f.read() - self.assertEqual(expected_char_count, int(contents)) - - short_words = self.get_wordcount_results( - result_prefix + '-short-words-00000-of-00001') - self.assertEqual(sorted(short_words), sorted(self.EXPECTED_SHORT_WORDS)) - - words = self.get_wordcount_results(result_prefix + '-words-00000-of-00001') - self.assertEqual(sorted(words), sorted(self.EXPECTED_WORDS)) - - + p = beam.Pipeline('BlockingDataflowPipelineRunner') + sample_text = p | beam.Create(self.SAMPLE_TEXT) + results = sample_text | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()).with_outputs('tag_short_words', 'tag_character_count', main='words') + result_count = (results.tag_character_count + | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) + | beam.GroupByKey() + | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) + result_words = results.words | 'count words' >> multiple_output_pardo.CountWords() + result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() + assert_that(sorted(result_words), equal_to(sorted(self.EXPECTED_WORDS))) + assert_that(sorted(result_short_words), equal_to(sorted(self.EXPECTED_SHORT_WORDS)), label='assert:tag_short_words') + assert_that(result_count,len(''.join(self.SAMPLE_TEXT.split('\n'))), label='assert:tag_character_count') + p.run() + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() From 5ca79cd40558ea46b6725862f4fe3b6cbef01c44 Mon Sep 17 00:00:00 2001 From: venbijjam Date: Fri, 14 Oct 2016 13:29:48 -0500 Subject: [PATCH 02/39] Update multiple_output_pardo_test.py --- .../examples/cookbook/multiple_output_pardo_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 10dc18a89f04..75915cf04f0f 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -42,10 +42,12 @@ def test_multiple_output_pardo(self): | beam.GroupByKey() | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) result_words = results.words | 'count words' >> multiple_output_pardo.CountWords() - result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() + result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() + assert_that(sorted(result_words), equal_to(sorted(self.EXPECTED_WORDS))) assert_that(sorted(result_short_words), equal_to(sorted(self.EXPECTED_SHORT_WORDS)), label='assert:tag_short_words') - assert_that(result_count,len(''.join(self.SAMPLE_TEXT.split('\n'))), label='assert:tag_character_count') + assert_that(result_count,len(''.join(self.SAMPLE_TEXT.split('\n'))), label='assert:tag_character_count') + p.run() if __name__ == '__main__': From de6619b2201824cf8cc5c98d9f5fee2a2d41d9c9 Mon Sep 17 00:00:00 2001 From: venbijjam Date: Fri, 14 Oct 2016 13:30:56 -0500 Subject: [PATCH 03/39] Update multiple_output_pardo_test.py --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 75915cf04f0f..8d69bfe65c75 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -43,11 +43,9 @@ def test_multiple_output_pardo(self): | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) result_words = results.words | 'count words' >> multiple_output_pardo.CountWords() result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() - assert_that(sorted(result_words), equal_to(sorted(self.EXPECTED_WORDS))) assert_that(sorted(result_short_words), equal_to(sorted(self.EXPECTED_SHORT_WORDS)), label='assert:tag_short_words') assert_that(result_count,len(''.join(self.SAMPLE_TEXT.split('\n'))), label='assert:tag_character_count') - p.run() if __name__ == '__main__': From 7f711605a8a79acbde09e2090a85da757d922345 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Mon, 17 Oct 2016 10:51:46 -0500 Subject: [PATCH 04/39] Update multiple_output_pardo_test.py --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 8d69bfe65c75..2002a9410194 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -45,7 +45,7 @@ def test_multiple_output_pardo(self): result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() assert_that(sorted(result_words), equal_to(sorted(self.EXPECTED_WORDS))) assert_that(sorted(result_short_words), equal_to(sorted(self.EXPECTED_SHORT_WORDS)), label='assert:tag_short_words') - assert_that(result_count,len(''.join(self.SAMPLE_TEXT.split('\n'))), label='assert:tag_character_count') + assert_that(result_count, equal_to(len(''.join(self.SAMPLE_TEXT.split('\n')))), label='assert:tag_character_count') p.run() if __name__ == '__main__': From bd282119c88b49a985eb14f0b62f3ca1f2499352 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Mon, 17 Oct 2016 11:03:44 -0500 Subject: [PATCH 05/39] updated to use DirectPipelineRunner --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 2002a9410194..fe42ace63d34 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -34,7 +34,7 @@ class MultipleOutputParDoTest(unittest.TestCase): ('whole', 1), ('world', 1), ('fantastic', 1), ('point', 1), ('view', 1)] def test_multiple_output_pardo(self): - p = beam.Pipeline('BlockingDataflowPipelineRunner') + p = beam.Pipeline('DirectPipelineRunner') sample_text = p | beam.Create(self.SAMPLE_TEXT) results = sample_text | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()).with_outputs('tag_short_words', 'tag_character_count', main='words') result_count = (results.tag_character_count From 722a3ed0f85cdfe95f1239c14987d13229066ffb Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Mon, 17 Oct 2016 12:04:13 -0500 Subject: [PATCH 06/39] making input iterable --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index fe42ace63d34..f6d9427ca0c2 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -28,7 +28,7 @@ class MultipleOutputParDoTest(unittest.TestCase): - SAMPLE_TEXT = 'A whole new world\nA new fantastic point of view' + SAMPLE_TEXT = ['A whole new world\nA new fantastic point of view'] EXPECTED_SHORT_WORDS = [('A', 2), ('new', 2), ('of', 1)] EXPECTED_WORDS = [ ('whole', 1), ('world', 1), ('fantastic', 1), ('point', 1), ('view', 1)] From bc41da84a5d432eb961c8c624abc4dcd899cf6ae Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Mon, 17 Oct 2016 15:10:44 -0500 Subject: [PATCH 07/39] Update multiple_output_pardo_test.py --- .../examples/cookbook/multiple_output_pardo_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index f6d9427ca0c2..bcc6c5c622d9 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -43,8 +43,8 @@ def test_multiple_output_pardo(self): | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) result_words = results.words | 'count words' >> multiple_output_pardo.CountWords() result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() - assert_that(sorted(result_words), equal_to(sorted(self.EXPECTED_WORDS))) - assert_that(sorted(result_short_words), equal_to(sorted(self.EXPECTED_SHORT_WORDS)), label='assert:tag_short_words') + assert_that(result_words, equal_to(self.EXPECTED_WORDS)) + assert_that(result_short_words, equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') assert_that(result_count, equal_to(len(''.join(self.SAMPLE_TEXT.split('\n')))), label='assert:tag_character_count') p.run() From e4a4543bd1f3c9e9762da3265881d70e68110b1f Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Thu, 20 Oct 2016 09:23:04 -0500 Subject: [PATCH 08/39] Update multiple_output_pardo_test.py --- .../examples/cookbook/multiple_output_pardo_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index bcc6c5c622d9..d0e10597aa7d 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -43,9 +43,9 @@ def test_multiple_output_pardo(self): | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) result_words = results.words | 'count words' >> multiple_output_pardo.CountWords() result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() - assert_that(result_words, equal_to(self.EXPECTED_WORDS)) - assert_that(result_short_words, equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') - assert_that(result_count, equal_to(len(''.join(self.SAMPLE_TEXT.split('\n')))), label='assert:tag_character_count') + beam.assert_that(result_words, beam.equal_to(self.EXPECTED_WORDS)) + beam.assert_that(result_short_words, beam.equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') + beam.assert_that(result_count, beam.equal_to(len(''.join(self.SAMPLE_TEXT.split('\n')))), label='assert:tag_character_count') p.run() if __name__ == '__main__': From a761f0b7ee1574d717cbc470aee50c2400723daf Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Thu, 20 Oct 2016 13:57:32 -0500 Subject: [PATCH 09/39] Update multiple_output_pardo_test.py --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index d0e10597aa7d..ff28dd029c44 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -28,7 +28,7 @@ class MultipleOutputParDoTest(unittest.TestCase): - SAMPLE_TEXT = ['A whole new world\nA new fantastic point of view'] + SAMPLE_TEXT = 'A whole new world\nA new fantastic point of view' EXPECTED_SHORT_WORDS = [('A', 2), ('new', 2), ('of', 1)] EXPECTED_WORDS = [ ('whole', 1), ('world', 1), ('fantastic', 1), ('point', 1), ('view', 1)] From deb4bd0bc16677e62ac96892d8cd140ee4c852e3 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Thu, 20 Oct 2016 15:30:58 -0500 Subject: [PATCH 10/39] Update multiple_output_pardo_test.py --- .../examples/cookbook/multiple_output_pardo_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index ff28dd029c44..2425c4704810 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -29,13 +29,15 @@ class MultipleOutputParDoTest(unittest.TestCase): SAMPLE_TEXT = 'A whole new world\nA new fantastic point of view' + text_len = len(''.join(SAMPLE_TEXT.split('\n'))) + SAMPLE_TEXT_Iterable = [SAMPLE_TEXT] EXPECTED_SHORT_WORDS = [('A', 2), ('new', 2), ('of', 1)] EXPECTED_WORDS = [ ('whole', 1), ('world', 1), ('fantastic', 1), ('point', 1), ('view', 1)] def test_multiple_output_pardo(self): p = beam.Pipeline('DirectPipelineRunner') - sample_text = p | beam.Create(self.SAMPLE_TEXT) + sample_text = p | beam.Create(self.SAMPLE_TEXT_Iterable) results = sample_text | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()).with_outputs('tag_short_words', 'tag_character_count', main='words') result_count = (results.tag_character_count | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) @@ -45,7 +47,7 @@ def test_multiple_output_pardo(self): result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() beam.assert_that(result_words, beam.equal_to(self.EXPECTED_WORDS)) beam.assert_that(result_short_words, beam.equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') - beam.assert_that(result_count, beam.equal_to(len(''.join(self.SAMPLE_TEXT.split('\n')))), label='assert:tag_character_count') + beam.assert_that(result_count, beam.equal_to(self.text_len), label='assert:tag_character_count') p.run() if __name__ == '__main__': From 827ecb8a86d815521ba1651f3dce88229a5ba0d3 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Thu, 20 Oct 2016 15:49:44 -0500 Subject: [PATCH 11/39] Update multiple_output_pardo_test.py --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 2425c4704810..e40ed74e152f 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -47,7 +47,7 @@ def test_multiple_output_pardo(self): result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() beam.assert_that(result_words, beam.equal_to(self.EXPECTED_WORDS)) beam.assert_that(result_short_words, beam.equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') - beam.assert_that(result_count, beam.equal_to(self.text_len), label='assert:tag_character_count') + beam.assert_that(result_count, beam.equal_to(list(self.text_len)), label='assert:tag_character_count') p.run() if __name__ == '__main__': From 5506234fdbf6cb4a2b1a00556dee7a46bec082bc Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Thu, 20 Oct 2016 16:05:27 -0500 Subject: [PATCH 12/39] Update multiple_output_pardo_test.py --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index e40ed74e152f..ea6fed838870 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -47,7 +47,7 @@ def test_multiple_output_pardo(self): result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() beam.assert_that(result_words, beam.equal_to(self.EXPECTED_WORDS)) beam.assert_that(result_short_words, beam.equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') - beam.assert_that(result_count, beam.equal_to(list(self.text_len)), label='assert:tag_character_count') + beam.assert_that(result_count, beam.equal_to([self.text_len]), label='assert:tag_character_count') p.run() if __name__ == '__main__': From 67b0ab630a542774335340ecfe20e4d0ea4573e3 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Thu, 20 Oct 2016 16:23:33 -0500 Subject: [PATCH 13/39] Update multiple_output_pardo_test.py --- .../examples/cookbook/multiple_output_pardo_test.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index ea6fed838870..4cfe7e82a30d 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -31,10 +31,11 @@ class MultipleOutputParDoTest(unittest.TestCase): SAMPLE_TEXT = 'A whole new world\nA new fantastic point of view' text_len = len(''.join(SAMPLE_TEXT.split('\n'))) SAMPLE_TEXT_Iterable = [SAMPLE_TEXT] - EXPECTED_SHORT_WORDS = [('A', 2), ('new', 2), ('of', 1)] - EXPECTED_WORDS = [ - ('whole', 1), ('world', 1), ('fantastic', 1), ('point', 1), ('view', 1)] - + #EXPECTED_SHORT_WORDS = [('A', 2), ('new', 2), ('of', 1)] + EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2', 'of: 1'] + #EXPECTED_WORDS = [ + # ('whole', 1), ('world', 1), ('fantastic', 1), ('point', 1), ('view', 1)] + EXPECTED_WORDS = ['fantastic: 1', 'point: 1', 'view: 1', 'whole: 1', 'world: 1'] def test_multiple_output_pardo(self): p = beam.Pipeline('DirectPipelineRunner') sample_text = p | beam.Create(self.SAMPLE_TEXT_Iterable) From 28f432a52243699d442a014540aa5e3a93ca90db Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Fri, 21 Oct 2016 10:23:05 -0500 Subject: [PATCH 14/39] Update multiple_output_pardo_test.py --- .../examples/cookbook/multiple_output_pardo_test.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 4cfe7e82a30d..21910eb4c2e4 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -30,11 +30,8 @@ class MultipleOutputParDoTest(unittest.TestCase): SAMPLE_TEXT = 'A whole new world\nA new fantastic point of view' text_len = len(''.join(SAMPLE_TEXT.split('\n'))) - SAMPLE_TEXT_Iterable = [SAMPLE_TEXT] - #EXPECTED_SHORT_WORDS = [('A', 2), ('new', 2), ('of', 1)] + SAMPLE_TEXT_Iterable = [SAMPLE_TEXT.replace('\n', ' ')] EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2', 'of: 1'] - #EXPECTED_WORDS = [ - # ('whole', 1), ('world', 1), ('fantastic', 1), ('point', 1), ('view', 1)] EXPECTED_WORDS = ['fantastic: 1', 'point: 1', 'view: 1', 'whole: 1', 'world: 1'] def test_multiple_output_pardo(self): p = beam.Pipeline('DirectPipelineRunner') From 9b72570975ac959e9942f01062d1f82e09d6ff5d Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Fri, 21 Oct 2016 11:08:46 -0500 Subject: [PATCH 15/39] Update multiple_output_pardo_test.py --- .../examples/cookbook/multiple_output_pardo_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 21910eb4c2e4..180e19e11ff5 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -29,8 +29,8 @@ class MultipleOutputParDoTest(unittest.TestCase): SAMPLE_TEXT = 'A whole new world\nA new fantastic point of view' - text_len = len(''.join(SAMPLE_TEXT.split('\n'))) - SAMPLE_TEXT_Iterable = [SAMPLE_TEXT.replace('\n', ' ')] + text_len = len(' '.join(SAMPLE_TEXT.split('\n'))) + SAMPLE_TEXT_Iterable = [SAMPLE_TEXT] EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2', 'of: 1'] EXPECTED_WORDS = ['fantastic: 1', 'point: 1', 'view: 1', 'whole: 1', 'world: 1'] def test_multiple_output_pardo(self): From 30de34147e6606df266c5d492e78794d38aec183 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Fri, 21 Oct 2016 12:16:33 -0500 Subject: [PATCH 16/39] Update multiple_output_pardo_test.py --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 180e19e11ff5..5c6a0a9387c3 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -25,7 +25,6 @@ from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import DataflowAssertException - class MultipleOutputParDoTest(unittest.TestCase): SAMPLE_TEXT = 'A whole new world\nA new fantastic point of view' From 70b9f3c54ba5fac47ce2597f56d42dd8558f6005 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Fri, 21 Oct 2016 12:32:36 -0500 Subject: [PATCH 17/39] Update multiple_output_pardo_test.py --- .../cookbook/multiple_output_pardo_test.py | 36 +++++++++++++------ 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 5c6a0a9387c3..61d1148d4972 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -22,29 +22,43 @@ import apache_beam as beam from apache_beam.examples.cookbook import multiple_output_pardo -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import DataflowAssertException +#from apache_beam.transforms.util import assert_that +#from apache_beam.transforms.util import DataflowAssertException class MultipleOutputParDoTest(unittest.TestCase): SAMPLE_TEXT = 'A whole new world\nA new fantastic point of view' text_len = len(' '.join(SAMPLE_TEXT.split('\n'))) SAMPLE_TEXT_Iterable = [SAMPLE_TEXT] - EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2', 'of: 1'] - EXPECTED_WORDS = ['fantastic: 1', 'point: 1', 'view: 1', 'whole: 1', 'world: 1'] + EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2', 'of: 1'] + EXPECTED_WORDS = ['fantastic: 1', 'point: 1', 'view: + 1', 'whole: 1', 'world: 1'] def test_multiple_output_pardo(self): p = beam.Pipeline('DirectPipelineRunner') sample_text = p | beam.Create(self.SAMPLE_TEXT_Iterable) - results = sample_text | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()).with_outputs('tag_short_words', 'tag_character_count', main='words') + results = sample_text + | beam.ParDo(multiple_output_pardo + .SplitLinesToWordsFn()) + .with_outputs('tag_short_words' + , 'tag_character_count', main='words') result_count = (results.tag_character_count - | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) + | 'pair_with_key' >> beam.Map(lambda x + : ('chars_temp_key', x)) | beam.GroupByKey() - | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) - result_words = results.words | 'count words' >> multiple_output_pardo.CountWords() - result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() + | 'count chars' >> beam.Map(lambda (_, counts) + : sum(counts))) + result_words = results.words + | 'count words' >> multiple_output_pardo + .CountWords() + result_short_words = results.tag_short_words + | 'count short words' >> multiple_output_pardo + .CountWords() beam.assert_that(result_words, beam.equal_to(self.EXPECTED_WORDS)) - beam.assert_that(result_short_words, beam.equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') - beam.assert_that(result_count, beam.equal_to([self.text_len]), label='assert:tag_character_count') + beam.assert_that(result_short_words + , beam.equal_to(self.EXPECTED_SHORT_WORDS) + , label='assert:tag_short_words') + beam.assert_that(result_count, beam.equal_to([self.text_len]) + , label='assert:tag_character_count') p.run() if __name__ == '__main__': From 934740fa608668843faf78370ae72d91ecbe73da Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Fri, 21 Oct 2016 12:40:42 -0500 Subject: [PATCH 18/39] Update multiple_output_pardo_test.py --- .../cookbook/multiple_output_pardo_test.py | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 61d1148d4972..d32a5798abc1 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -31,34 +31,23 @@ class MultipleOutputParDoTest(unittest.TestCase): text_len = len(' '.join(SAMPLE_TEXT.split('\n'))) SAMPLE_TEXT_Iterable = [SAMPLE_TEXT] EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2', 'of: 1'] - EXPECTED_WORDS = ['fantastic: 1', 'point: 1', 'view: - 1', 'whole: 1', 'world: 1'] + EXPECTED_WORDS = ['fantastic: 1', 'point: 1', 'view: 1', 'whole: 1', 'world: 1'] def test_multiple_output_pardo(self): p = beam.Pipeline('DirectPipelineRunner') sample_text = p | beam.Create(self.SAMPLE_TEXT_Iterable) results = sample_text - | beam.ParDo(multiple_output_pardo - .SplitLinesToWordsFn()) - .with_outputs('tag_short_words' - , 'tag_character_count', main='words') + | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()).with_outputs('tag_short_words', 'tag_character_count', main='words') result_count = (results.tag_character_count - | 'pair_with_key' >> beam.Map(lambda x - : ('chars_temp_key', x)) + | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) | beam.GroupByKey() - | 'count chars' >> beam.Map(lambda (_, counts) - : sum(counts))) + | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) result_words = results.words - | 'count words' >> multiple_output_pardo - .CountWords() + | 'count words' >> multiple_output_pardo.CountWords() result_short_words = results.tag_short_words - | 'count short words' >> multiple_output_pardo - .CountWords() + | 'count short words' >> multiple_output_pardo.CountWords() beam.assert_that(result_words, beam.equal_to(self.EXPECTED_WORDS)) - beam.assert_that(result_short_words - , beam.equal_to(self.EXPECTED_SHORT_WORDS) - , label='assert:tag_short_words') - beam.assert_that(result_count, beam.equal_to([self.text_len]) - , label='assert:tag_character_count') + beam.assert_that(result_short_words, beam.equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') + beam.assert_that(result_count, beam.equal_to([self.text_len])), label='assert:tag_character_count') p.run() if __name__ == '__main__': From af1518fb8f830c712d117347c8b5c5cea28aabd5 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Fri, 21 Oct 2016 12:50:54 -0500 Subject: [PATCH 19/39] Update multiple_output_pardo_test.py --- .../examples/cookbook/multiple_output_pardo_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index d32a5798abc1..f9212d8b82f9 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -35,8 +35,7 @@ class MultipleOutputParDoTest(unittest.TestCase): def test_multiple_output_pardo(self): p = beam.Pipeline('DirectPipelineRunner') sample_text = p | beam.Create(self.SAMPLE_TEXT_Iterable) - results = sample_text - | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()).with_outputs('tag_short_words', 'tag_character_count', main='words') + results = sample_text | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()).with_outputs('tag_short_words', 'tag_character_count', main='words') result_count = (results.tag_character_count | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) | beam.GroupByKey() From 653127d934e7399b4a68ea3f76bfaa778765d1a7 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Fri, 21 Oct 2016 13:03:35 -0500 Subject: [PATCH 20/39] Update multiple_output_pardo_test.py --- .../examples/cookbook/multiple_output_pardo_test.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index f9212d8b82f9..f4040878f24f 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -40,10 +40,8 @@ def test_multiple_output_pardo(self): | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) | beam.GroupByKey() | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) - result_words = results.words - | 'count words' >> multiple_output_pardo.CountWords() - result_short_words = results.tag_short_words - | 'count short words' >> multiple_output_pardo.CountWords() + result_words = results.words | 'count words' >> multiple_output_pardo.CountWords() + result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() beam.assert_that(result_words, beam.equal_to(self.EXPECTED_WORDS)) beam.assert_that(result_short_words, beam.equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') beam.assert_that(result_count, beam.equal_to([self.text_len])), label='assert:tag_character_count') From 71d9249e0eedbb7020b1e60dc10ba15a41147cb1 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Fri, 21 Oct 2016 14:09:21 -0500 Subject: [PATCH 21/39] Update multiple_output_pardo_test.py --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index f4040878f24f..185fd440e3e3 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -44,7 +44,7 @@ def test_multiple_output_pardo(self): result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() beam.assert_that(result_words, beam.equal_to(self.EXPECTED_WORDS)) beam.assert_that(result_short_words, beam.equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') - beam.assert_that(result_count, beam.equal_to([self.text_len])), label='assert:tag_character_count') + beam.assert_that(result_count, beam.equal_to([self.text_len]), label='assert:tag_character_count') p.run() if __name__ == '__main__': From 990b78d715eba7c86490b68e347f5196394ce099 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Fri, 21 Oct 2016 15:21:49 -0500 Subject: [PATCH 22/39] Update multiple_output_pardo_test.py --- .../cookbook/multiple_output_pardo_test.py | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 185fd440e3e3..9b96ce3043d6 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -27,26 +27,38 @@ class MultipleOutputParDoTest(unittest.TestCase): - SAMPLE_TEXT = 'A whole new world\nA new fantastic point of view' + SAMPLE_TEXT = 'A whole new world\nA new point' text_len = len(' '.join(SAMPLE_TEXT.split('\n'))) SAMPLE_TEXT_Iterable = [SAMPLE_TEXT] - EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2', 'of: 1'] - EXPECTED_WORDS = ['fantastic: 1', 'point: 1', 'view: 1', 'whole: 1', 'world: 1'] + EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2'] + EXP_WORDS = ['point: 1', 'whole: 1', 'world: 1'] def test_multiple_output_pardo(self): p = beam.Pipeline('DirectPipelineRunner') sample_text = p | beam.Create(self.SAMPLE_TEXT_Iterable) - results = sample_text | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()).with_outputs('tag_short_words', 'tag_character_count', main='words') - result_count = (results.tag_character_count - | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) - | beam.GroupByKey() - | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) - result_words = results.words | 'count words' >> multiple_output_pardo.CountWords() - result_short_words = results.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords() - beam.assert_that(result_words, beam.equal_to(self.EXPECTED_WORDS)) - beam.assert_that(result_short_words, beam.equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') - beam.assert_that(result_count, beam.equal_to([self.text_len]), label='assert:tag_character_count') + res = (sample_text + | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()) + .with_outputs( + 'tag_short_words', + 'tag_character_count', + main='words')) + res_cnt= (res.tag_character_count + | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) + | beam.GroupByKey() + | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) + res_words = (res.words + | 'count words' >> multiple_output_pardo.CountWords()) + res_sh_wrd = (res.tag_short_words + | 'count short words' >> multiple_output_pardo.CountWords()) + beam.assert_that(res, + beam.equal_to(self.EXPECTED_WORDS)) + beam.assert_that(res_sh_wrd, + beam.equal_to(self.EXPECTED_SHORT_WORDS), + label='assert:tag_short_words') + beam.assert_that(res_cnt, + beam.equal_to([self.text_len]), + label='assert:tag_character_count') p.run() - + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() From 10e252133683821d0aeb2f21ae742ca8e43e6857 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Fri, 21 Oct 2016 15:29:59 -0500 Subject: [PATCH 23/39] Update multiple_output_pardo_test.py --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 9b96ce3043d6..397578cda3b3 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -50,7 +50,7 @@ def test_multiple_output_pardo(self): res_sh_wrd = (res.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords()) beam.assert_that(res, - beam.equal_to(self.EXPECTED_WORDS)) + beam.equal_to(self.EXP_WORDS)) beam.assert_that(res_sh_wrd, beam.equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') From 3b44b675be8eca08b6dbc8d75f88af0208895055 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Fri, 21 Oct 2016 15:41:03 -0500 Subject: [PATCH 24/39] Update multiple_output_pardo_test.py --- .../examples/cookbook/multiple_output_pardo_test.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 397578cda3b3..9df5a411e39a 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -38,9 +38,7 @@ def test_multiple_output_pardo(self): res = (sample_text | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()) .with_outputs( - 'tag_short_words', - 'tag_character_count', - main='words')) + 'tag_short_words', 'tag_character_count', main='words')) res_cnt= (res.tag_character_count | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) | beam.GroupByKey() @@ -49,7 +47,7 @@ def test_multiple_output_pardo(self): | 'count words' >> multiple_output_pardo.CountWords()) res_sh_wrd = (res.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords()) - beam.assert_that(res, + beam.assert_that(res_words, beam.equal_to(self.EXP_WORDS)) beam.assert_that(res_sh_wrd, beam.equal_to(self.EXPECTED_SHORT_WORDS), From 0bfba8e6cc6f8c1d77b0fac1eb9ae0d99249cff1 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Fri, 21 Oct 2016 15:58:45 -0500 Subject: [PATCH 25/39] Update multiple_output_pardo_test.py --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 9df5a411e39a..32b062a6a822 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -39,7 +39,7 @@ def test_multiple_output_pardo(self): | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()) .with_outputs( 'tag_short_words', 'tag_character_count', main='words')) - res_cnt= (res.tag_character_count + res_cnt = (res.tag_character_count | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) | beam.GroupByKey() | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) From ebca3f074d11ba5944188d05331a7742b3ea6416 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Fri, 21 Oct 2016 16:06:31 -0500 Subject: [PATCH 26/39] Update multiple_output_pardo_test.py --- .../cookbook/multiple_output_pardo_test.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 32b062a6a822..42e9eb4bd097 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -32,28 +32,25 @@ class MultipleOutputParDoTest(unittest.TestCase): SAMPLE_TEXT_Iterable = [SAMPLE_TEXT] EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2'] EXP_WORDS = ['point: 1', 'whole: 1', 'world: 1'] + def test_multiple_output_pardo(self): p = beam.Pipeline('DirectPipelineRunner') sample_text = p | beam.Create(self.SAMPLE_TEXT_Iterable) res = (sample_text | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()) - .with_outputs( - 'tag_short_words', 'tag_character_count', main='words')) + .with_outputs('tag_short_words', 'tag_character_count', main='words')) res_cnt = (res.tag_character_count - | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) - | beam.GroupByKey() - | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) + | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) + | beam.GroupByKey() + | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) res_words = (res.words | 'count words' >> multiple_output_pardo.CountWords()) res_sh_wrd = (res.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords()) - beam.assert_that(res_words, - beam.equal_to(self.EXP_WORDS)) - beam.assert_that(res_sh_wrd, - beam.equal_to(self.EXPECTED_SHORT_WORDS), + beam.assert_that(res_words, beam.equal_to(self.EXP_WORDS)) + beam.assert_that(res_sh_wrd, beam.equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') - beam.assert_that(res_cnt, - beam.equal_to([self.text_len]), + beam.assert_that(res_cnt, beam.equal_to([self.text_len]), label='assert:tag_character_count') p.run() From 34f45f66a1fc695fc4cd5eb5d20679be7c89ad9b Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Mon, 24 Oct 2016 09:24:47 -0500 Subject: [PATCH 27/39] Update multiple_output_pardo_test.py --- .../cookbook/multiple_output_pardo_test.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 42e9eb4bd097..a9cdbbd9e0a3 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -32,28 +32,28 @@ class MultipleOutputParDoTest(unittest.TestCase): SAMPLE_TEXT_Iterable = [SAMPLE_TEXT] EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2'] EXP_WORDS = ['point: 1', 'whole: 1', 'world: 1'] - + def test_multiple_output_pardo(self): p = beam.Pipeline('DirectPipelineRunner') sample_text = p | beam.Create(self.SAMPLE_TEXT_Iterable) - res = (sample_text + res = (sample_text | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()) .with_outputs('tag_short_words', 'tag_character_count', main='words')) - res_cnt = (res.tag_character_count + res_cnt = (res.tag_character_count | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) | beam.GroupByKey() | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) - res_words = (res.words + res_words = (res.words | 'count words' >> multiple_output_pardo.CountWords()) - res_sh_wrd = (res.tag_short_words + res_sh_wrd = (res.tag_short_words | 'count short words' >> multiple_output_pardo.CountWords()) beam.assert_that(res_words, beam.equal_to(self.EXP_WORDS)) beam.assert_that(res_sh_wrd, beam.equal_to(self.EXPECTED_SHORT_WORDS), - label='assert:tag_short_words') + label='assert:tag_short_words') beam.assert_that(res_cnt, beam.equal_to([self.text_len]), label='assert:tag_character_count') p.run() - + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() From 4b536510d9b39f5652cfc74fffdcf0b73671166d Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Mon, 24 Oct 2016 09:46:12 -0500 Subject: [PATCH 28/39] Update multiple_output_pardo_test.py --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index a9cdbbd9e0a3..4591fe9bdfe5 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -38,7 +38,7 @@ def test_multiple_output_pardo(self): sample_text = p | beam.Create(self.SAMPLE_TEXT_Iterable) res = (sample_text | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()) - .with_outputs('tag_short_words', 'tag_character_count', main='words')) + .with_outputs('tag_short_words','tag_character_count',main='words')) res_cnt = (res.tag_character_count | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) | beam.GroupByKey() From 92bf58bfb9e286a89147ebb77c5edf3ac685d63d Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Mon, 24 Oct 2016 09:55:56 -0500 Subject: [PATCH 29/39] Update multiple_output_pardo_test.py --- .../examples/cookbook/multiple_output_pardo_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 4591fe9bdfe5..c4f79e154f38 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -38,7 +38,8 @@ def test_multiple_output_pardo(self): sample_text = p | beam.Create(self.SAMPLE_TEXT_Iterable) res = (sample_text | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()) - .with_outputs('tag_short_words','tag_character_count',main='words')) + .with_outputs('tag_short_words', 'tag_character_count', main='words') + ) res_cnt = (res.tag_character_count | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) | beam.GroupByKey() From eed459f03b74a95a80518db02a44413ab55b07ac Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Mon, 24 Oct 2016 10:04:13 -0500 Subject: [PATCH 30/39] Update multiple_output_pardo_test.py --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index c4f79e154f38..25cb2c6a311b 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -27,6 +27,7 @@ class MultipleOutputParDoTest(unittest.TestCase): + SAMPLE_TEXT = 'A whole new world\nA new point' text_len = len(' '.join(SAMPLE_TEXT.split('\n'))) SAMPLE_TEXT_Iterable = [SAMPLE_TEXT] From 72a48ce5be9647df1be7e81b26e69cb141b8a568 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Mon, 24 Oct 2016 10:11:22 -0500 Subject: [PATCH 31/39] Update multiple_output_pardo_test.py --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 25cb2c6a311b..260fed7f16a8 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -25,8 +25,8 @@ #from apache_beam.transforms.util import assert_that #from apache_beam.transforms.util import DataflowAssertException -class MultipleOutputParDoTest(unittest.TestCase): +class MultipleOutputParDoTest(unittest.TestCase): SAMPLE_TEXT = 'A whole new world\nA new point' text_len = len(' '.join(SAMPLE_TEXT.split('\n'))) From 98318dfe21319bc748b680a3d9f0b01f4e797a41 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Mon, 24 Oct 2016 12:00:17 -0500 Subject: [PATCH 32/39] fixed code based on altay's comments. --- .../cookbook/multiple_output_pardo_test.py | 74 +++++++++++-------- 1 file changed, 43 insertions(+), 31 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 260fed7f16a8..b08f429ce381 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -22,40 +22,52 @@ import apache_beam as beam from apache_beam.examples.cookbook import multiple_output_pardo -#from apache_beam.transforms.util import assert_that -#from apache_beam.transforms.util import DataflowAssertException class MultipleOutputParDoTest(unittest.TestCase): - SAMPLE_TEXT = 'A whole new world\nA new point' - text_len = len(' '.join(SAMPLE_TEXT.split('\n'))) - SAMPLE_TEXT_Iterable = [SAMPLE_TEXT] - EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2'] - EXP_WORDS = ['point: 1', 'whole: 1', 'world: 1'] - - def test_multiple_output_pardo(self): - p = beam.Pipeline('DirectPipelineRunner') - sample_text = p | beam.Create(self.SAMPLE_TEXT_Iterable) - res = (sample_text - | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()) - .with_outputs('tag_short_words', 'tag_character_count', main='words') - ) - res_cnt = (res.tag_character_count - | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) - | beam.GroupByKey() - | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) - res_words = (res.words - | 'count words' >> multiple_output_pardo.CountWords()) - res_sh_wrd = (res.tag_short_words - | 'count short words' >> multiple_output_pardo.CountWords()) - beam.assert_that(res_words, beam.equal_to(self.EXP_WORDS)) - beam.assert_that(res_sh_wrd, beam.equal_to(self.EXPECTED_SHORT_WORDS), - label='assert:tag_short_words') - beam.assert_that(res_cnt, beam.equal_to([self.text_len]), - label='assert:tag_character_count') - p.run() + SAMPLE_TEXT = 'A whole new world\nA new point' + EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2'] + EXPECTED_WORDS = ['point: 1', 'whole: 1', 'world: 1'] + + def test_multiple_output_pardo(self): + p = beam.Pipeline('DirectPipelineRunner') + + sample_text = p | beam.Create([self.SAMPLE_TEXT]) + + results = (sample_text + | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()) + .with_outputs('tag_short_words', + 'tag_character_count', + main='words')) + + results_cnt = (results.tag_character_count + | 'pair_with_key' >> beam.Map( + lambda x: ('chars_temp_key', x)) + | beam.GroupByKey() + | 'count chars' >> beam.Map( + lambda (_, counts): sum(counts))) + + results_words = (results.words + | 'count words' >> multiple_output_pardo.CountWords()) + + res_short_words = (results.tag_short_words + | 'count short words' >> + multiple_output_pardo.CountWords()) + + beam.assert_that(results_words, + beam.equal_to(self.EXPECTED_WORDS), + label='assert:words') + beam.assert_that(res_short_words, + beam.equal_to(self.EXPECTED_SHORT_WORDS), + label='assert:tag_short_words') + beam.assert_that(results_cnt, + beam.equal_to( + [len(' '.join(self.SAMPLE_TEXT.split('\n')))]), + label='assert:tag_character_count') + p.run() + if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() + logging.getLogger().setLevel(logging.INFO) + unittest.main() From 6a6f30bb679fc6db5ed36cb94571628c884c6bee Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Mon, 24 Oct 2016 12:22:20 -0500 Subject: [PATCH 33/39] Update multiple_output_pardo_test.py --- .../cookbook/multiple_output_pardo_test.py | 69 +++++++++---------- 1 file changed, 34 insertions(+), 35 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index b08f429ce381..582b1f43bbd0 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -26,48 +26,47 @@ class MultipleOutputParDoTest(unittest.TestCase): - SAMPLE_TEXT = 'A whole new world\nA new point' - EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2'] - EXPECTED_WORDS = ['point: 1', 'whole: 1', 'world: 1'] + SAMPLE_TEXT = 'A whole new world\nA new point' + EXPECTED_SHORT_WORDS = ['A: 2', 'new: 2'] + EXPECTED_WORDS = ['point: 1', 'whole: 1', 'world: 1'] - def test_multiple_output_pardo(self): - p = beam.Pipeline('DirectPipelineRunner') + def test_multiple_output_pardo(self): + p = beam.Pipeline('DirectPipelineRunner') - sample_text = p | beam.Create([self.SAMPLE_TEXT]) + sample_text = p | beam.Create([self.SAMPLE_TEXT]) - results = (sample_text - | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()) - .with_outputs('tag_short_words', - 'tag_character_count', - main='words')) + results = (sample_text + | beam.ParDo(multiple_output_pardo.SplitLinesToWordsFn()) + .with_outputs('tag_short_words', + 'tag_character_count', + main='words')) - results_cnt = (results.tag_character_count - | 'pair_with_key' >> beam.Map( - lambda x: ('chars_temp_key', x)) - | beam.GroupByKey() - | 'count chars' >> beam.Map( - lambda (_, counts): sum(counts))) + results_cnt = (results.tag_character_count + | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x) + ) + | beam.GroupByKey() + | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) - results_words = (results.words - | 'count words' >> multiple_output_pardo.CountWords()) + results_words = (results.words + | 'count words' >> multiple_output_pardo.CountWords()) - res_short_words = (results.tag_short_words - | 'count short words' >> - multiple_output_pardo.CountWords()) + res_short_words = (results.tag_short_words + | 'count short words' >> + multiple_output_pardo.CountWords()) - beam.assert_that(results_words, - beam.equal_to(self.EXPECTED_WORDS), - label='assert:words') - beam.assert_that(res_short_words, - beam.equal_to(self.EXPECTED_SHORT_WORDS), - label='assert:tag_short_words') - beam.assert_that(results_cnt, - beam.equal_to( - [len(' '.join(self.SAMPLE_TEXT.split('\n')))]), - label='assert:tag_character_count') - p.run() + beam.assert_that(results_words, + beam.equal_to(self.EXPECTED_WORDS), + label='assert:words') + beam.assert_that(res_short_words, + beam.equal_to(self.EXPECTED_SHORT_WORDS), + label='assert:tag_short_words') + beam.assert_that(results_cnt, + beam.equal_to( + [len(' '.join(self.SAMPLE_TEXT.split('\n')))]), + label='assert:tag_character_count') + p.run() if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() + logging.getLogger().setLevel(logging.INFO) + unittest.main() From 72c67410cc208e73bc0f65ea486bab9e26ab8612 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Mon, 24 Oct 2016 13:47:20 -0500 Subject: [PATCH 34/39] Update multiple_output_pardo_test.py --- .../examples/cookbook/multiple_output_pardo_test.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 582b1f43bbd0..579905c89b36 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -42,8 +42,7 @@ def test_multiple_output_pardo(self): main='words')) results_cnt = (results.tag_character_count - | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x) - ) + | 'pair_with_key' >> beam.Map(lambda x: ('chars_tmp_key', x)) | beam.GroupByKey() | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))) @@ -61,8 +60,8 @@ def test_multiple_output_pardo(self): beam.equal_to(self.EXPECTED_SHORT_WORDS), label='assert:tag_short_words') beam.assert_that(results_cnt, - beam.equal_to( - [len(' '.join(self.SAMPLE_TEXT.split('\n')))]), + beam.equal_to([len(' '.join(self.SAMPLE_TEXT.split('\n')))] + ), label='assert:tag_character_count') p.run() From d8bc2876778f5b943db9d93663b37f4de77b69d0 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Mon, 24 Oct 2016 13:53:18 -0500 Subject: [PATCH 35/39] Update multiple_output_pardo_test.py --- .../apache_beam/examples/cookbook/multiple_output_pardo_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 579905c89b36..0af07c42aaa7 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -61,7 +61,7 @@ def test_multiple_output_pardo(self): label='assert:tag_short_words') beam.assert_that(results_cnt, beam.equal_to([len(' '.join(self.SAMPLE_TEXT.split('\n')))] - ), + ), label='assert:tag_character_count') p.run() From d9fbaf0de7ce18e44c17cb225795e9cfa291284d Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Tue, 25 Oct 2016 14:56:25 -0500 Subject: [PATCH 36/39] Update mergecontacts_test.py --- .../examples/cookbook/mergecontacts_test.py | 78 +++++++++++++------ 1 file changed, 56 insertions(+), 22 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py index b3be0ddb0853..7e3339544e2e 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py @@ -18,9 +18,10 @@ """Test for the mergecontacts example.""" import logging -import tempfile +import re import unittest +import apache_beam as beam from apache_beam.examples.cookbook import mergecontacts @@ -81,11 +82,9 @@ class MergeContactsTest(unittest.TestCase): '1 writers', '3 nomads', '']) - - def create_temp_file(self, contents): - with tempfile.NamedTemporaryFile(delete=False) as f: - f.write(contents) - return f.name + EXPECTED_LUDDITES = 2 + EXPECTED_WRITERS = 1 + EXPECTED_NOMADS = 3 def normalize_tsv_results(self, tsv_data): """Sort .tsv file data so we can compare it with expected output.""" @@ -101,22 +100,57 @@ def normalize_tsv_results(self, tsv_data): return '\n'.join(sorted(lines_out)) + '\n' def test_mergecontacts(self): - path_email = self.create_temp_file(self.CONTACTS_EMAIL) - path_phone = self.create_temp_file(self.CONTACTS_PHONE) - path_snailmail = self.create_temp_file(self.CONTACTS_SNAILMAIL) - - result_prefix = self.create_temp_file('') - - mergecontacts.run([ - '--input_email=%s' % path_email, - '--input_phone=%s' % path_phone, - '--input_snailmail=%s' % path_snailmail, - '--output_tsv=%s.tsv' % result_prefix, - '--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3)) - - with open('%s.tsv-00000-of-00001' % result_prefix) as f: - contents = f.read() - self.assertEqual(self.EXPECTED_TSV, self.normalize_tsv_results(contents)) + p = beam.Pipeline('DirectPipelineRunner') + + contacts_email = p | beam.Create(self.CONTACTS_EMAIL) + contacts_phone = p | beam.Create(self.CONTACTS_PHONE) + contacts_snailmail = p | beam.Create(self.CONTACTS_SNAILMAIL) + + email = (contacts_email + | beam.Map('backslash_email', lambda x: re.sub(r'\\', r'\\\\', x)) + | beam.Map('escape_quotes_email', lambda x: re.sub(r'"', r'\"', x)) + | beam.Map('split_email', lambda x: re.split(r'\t+', x, 1))) + phone = (contacts_phone + | beam.Map('backslash_phone', lambda x: re.sub(r'\\', r'\\\\', x)) + | beam.Map('escape_quotes_phone', lambda x: re.sub(r'"', r'\"', x)) + | beam.Map('split_phone', lambda x: re.split(r'\t+', x, 1))) + snailmail = (contacts_snailmail + | beam.Map('backslash_snailmail', + lambda x: re.sub(r'\\', r'\\\\', x)) + | beam.Map('escape_quotes_snailmail', + lambda x: re.sub(r'"', r'\"', x)) + | beam.Map('split_snailmail', + lambda x: re.split(r'\t+', x, 1))) + + grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey() + + result_tsv_lines = (grouped + | beam.Map(lambda (name, (email, phone, snailmail)): '\t' + .join(['"%s"' % name, + '"%s"' % ','.join(email), + '"%s"' % ','.join(phone), + '"%s"' % next(iter(snailmail), '')]))) + + luddites = (grouped | beam.Filter(lambda (name, (email, phone, snailmail)): + not next(iter(email), None))) + writers = (grouped | beam.Filter(lambda (name, (email, phone, snailmail)): + not next(iter(phone), None))) + nomads = (grouped | beam.Filter(lambda (name, (email, phone, snailmail)): + not next(iter(snailmail), None))) + + num_luddites = luddites | 'luddites' >> beam.combiners.Count.Globally() + num_writers = writers | 'writers' >> beam.combiners.Count.Globally() + num_nomads = nomads | 'nomads' >> beam.combiners.Count.Globally() + + beam.assert_that(self.normalize_tsv_results(result_tsv_lines), + beam.equal_to(self.EXPECTED_TSV), + label='assert:tag_tsv_results') + beam.assert_that(num_luddites, beam.equal_to(self.EXPECTED_LUDDITES), + label='assert:tag_num_luddites') + beam.assert_that(num_writers, beam.equal_to(self.EXPECTED_WRITERS), + label='assert:tag_num_writers') + beam.assert_that(num_nomads, beam.equal_to(self.EXPECTED_NOMADS), + label='assert:tag_num_nomads') if __name__ == '__main__': From 06f2fb203708bf14b8d8349ae954e1a3f1f7a891 Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Tue, 25 Oct 2016 15:08:29 -0500 Subject: [PATCH 37/39] Update mergecontacts_test.py --- .../examples/cookbook/mergecontacts_test.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py index 7e3339544e2e..8532e63286ff 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py @@ -102,9 +102,9 @@ def normalize_tsv_results(self, tsv_data): def test_mergecontacts(self): p = beam.Pipeline('DirectPipelineRunner') - contacts_email = p | beam.Create(self.CONTACTS_EMAIL) - contacts_phone = p | beam.Create(self.CONTACTS_PHONE) - contacts_snailmail = p | beam.Create(self.CONTACTS_SNAILMAIL) + contacts_email = p | beam.Create([self.CONTACTS_EMAIL]) + contacts_phone = p | beam.Create([self.CONTACTS_PHONE]) + contacts_snailmail = p | beam.Create([self.CONTACTS_SNAILMAIL]) email = (contacts_email | beam.Map('backslash_email', lambda x: re.sub(r'\\', r'\\\\', x)) @@ -143,13 +143,13 @@ def test_mergecontacts(self): num_nomads = nomads | 'nomads' >> beam.combiners.Count.Globally() beam.assert_that(self.normalize_tsv_results(result_tsv_lines), - beam.equal_to(self.EXPECTED_TSV), + beam.equal_to([self.EXPECTED_TSV]), label='assert:tag_tsv_results') - beam.assert_that(num_luddites, beam.equal_to(self.EXPECTED_LUDDITES), + beam.assert_that(num_luddites, beam.equal_to([self.EXPECTED_LUDDITES]), label='assert:tag_num_luddites') - beam.assert_that(num_writers, beam.equal_to(self.EXPECTED_WRITERS), + beam.assert_that(num_writers, beam.equal_to([self.EXPECTED_WRITERS]), label='assert:tag_num_writers') - beam.assert_that(num_nomads, beam.equal_to(self.EXPECTED_NOMADS), + beam.assert_that(num_nomads, beam.equal_to([self.EXPECTED_NOMADS]), label='assert:tag_num_nomads') From 79264ec0e11e106c71da773e013db3a92ebbab9f Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Wed, 26 Oct 2016 15:32:36 -0500 Subject: [PATCH 38/39] Update mergecontacts_test.py --- .../examples/cookbook/mergecontacts_test.py | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py index 8532e63286ff..216e130c498f 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py @@ -22,7 +22,6 @@ import unittest import apache_beam as beam -from apache_beam.examples.cookbook import mergecontacts class MergeContactsTest(unittest.TestCase): @@ -101,10 +100,11 @@ def normalize_tsv_results(self, tsv_data): def test_mergecontacts(self): p = beam.Pipeline('DirectPipelineRunner') - - contacts_email = p | beam.Create([self.CONTACTS_EMAIL]) - contacts_phone = p | beam.Create([self.CONTACTS_PHONE]) - contacts_snailmail = p | beam.Create([self.CONTACTS_SNAILMAIL]) + + contacts_email = p | 'create_email' >> beam.Create([self.CONTACTS_EMAIL]) + contacts_phone = p | 'create_phone' >> beam.Create([self.CONTACTS_PHONE]) + contacts_snailmail = (p | 'create_snail_mail' >> + beam.Create([self.CONTACTS_SNAILMAIL])) email = (contacts_email | beam.Map('backslash_email', lambda x: re.sub(r'\\', r'\\\\', x)) @@ -119,30 +119,33 @@ def test_mergecontacts(self): lambda x: re.sub(r'\\', r'\\\\', x)) | beam.Map('escape_quotes_snailmail', lambda x: re.sub(r'"', r'\"', x)) - | beam.Map('split_snailmail', + | beam.Map('split_snailmail', lambda x: re.split(r'\t+', x, 1))) - + grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey() - - result_tsv_lines = (grouped - | beam.Map(lambda (name, (email, phone, snailmail)): '\t' - .join(['"%s"' % name, - '"%s"' % ','.join(email), - '"%s"' % ','.join(phone), - '"%s"' % next(iter(snailmail), '')]))) - - luddites = (grouped | beam.Filter(lambda (name, (email, phone, snailmail)): - not next(iter(email), None))) - writers = (grouped | beam.Filter(lambda (name, (email, phone, snailmail)): - not next(iter(phone), None))) - nomads = (grouped | beam.Filter(lambda (name, (email, phone, snailmail)): - not next(iter(snailmail), None))) - + + result_tsv_lines = (grouped | 'result_tsv' >> beam.Map( + lambda (name, (email, phone, snailmail)): '\t' + .join(['"%s"' % name, + '"%s"' % ','.join(sorted(email.strip('"').split(','))), + '"%s"' % ','.join(sorted(phone.strip('"').split(','))), + '"%s"' % next(iter(snailmail), '')]))) + + luddites = (grouped | 'filter_luddites' >> beam.Filter( + lambda (name, (email, phone, snailmail) + ): not next(iter(email), None))) + writers = (grouped | 'filter_writers' >> beam.Filter( + lambda (name, (email, phone, snailmail) + ): not next(iter(phone), None))) + nomads = (grouped | 'filter_nomads' >> beam.Filter( + lambda (name, (email, phone, snailmail) + ): not next(iter(snailmail), None))) + num_luddites = luddites | 'luddites' >> beam.combiners.Count.Globally() num_writers = writers | 'writers' >> beam.combiners.Count.Globally() num_nomads = nomads | 'nomads' >> beam.combiners.Count.Globally() - - beam.assert_that(self.normalize_tsv_results(result_tsv_lines), + #(self.normalize_tsv_results('\n'.join(result_tsv_lines)), + beam.assert_that(result_tsv_lines, beam.equal_to([self.EXPECTED_TSV]), label='assert:tag_tsv_results') beam.assert_that(num_luddites, beam.equal_to([self.EXPECTED_LUDDITES]), From 70ca711f129dca372f017fe620077fe37df714db Mon Sep 17 00:00:00 2001 From: Geetha Bijjam Date: Thu, 27 Oct 2016 09:23:54 -0500 Subject: [PATCH 39/39] Update group_with_coder_test.py --- .../cookbook/group_with_coder_test.py | 71 +++++++++---------- 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py index fb52809327b6..31e073ee287d 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py @@ -18,9 +18,10 @@ """Test for the custom coders example.""" import logging -import tempfile import unittest +import apache_beam as beam +from apache_beam import coders from apache_beam.examples.cookbook import group_with_coder @@ -36,53 +37,49 @@ class GroupWithCoderTest(unittest.TestCase): 'joe,20', 'fred,6', 'ann,5', 'joe,30', 'ann,10', 'mary,1'] - def create_temp_file(self, records): - with tempfile.NamedTemporaryFile(delete=False) as f: - for record in records: - f.write('%s\n' % record) - return f.name - def test_basics_with_type_check(self): # Run the workflow with --pipeline_type_check option. This will make sure # the typehints associated with all transforms will have non-default values # and therefore any custom coders will be used. In our case we want to make # sure the coder for the Player class will be used. - temp_path = self.create_temp_file(self.SAMPLE_RECORDS) - group_with_coder.run([ - '--pipeline_type_check', - '--input=%s*' % temp_path, - '--output=%s.result' % temp_path]) - # Parse result file and compare. - results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: - for line in result_file: - name, points = line.split(',') - results.append((name, int(points))) - logging.info('result: %s', results) - self.assertEqual( - sorted(results), - sorted([('x:ann', 15), ('x:fred', 9), ('x:joe', 60), ('x:mary', 8)])) + + coders.registry.register_coder(group_with_coder.Player, + group_with_coder.PlayerCoder) + + p = beam.Pipeline('DirectPipelineRunner') + + results = (p + | 'create_sample' >> beam.Create([self.SAMPLE_RECORDS]) + | beam.Map(group_with_coder.get_players) + | beam.CombinePerKey(sum) + | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))) + + beam.assert_that(results, + beam.equal_to([('x:ann', 15), + ('x:fred', 9), + ('x:joe', 60), + ('x:mary', 8)]), + label='assert:tag_basics_with_type_check') def test_basics_without_type_check(self): # Run the workflow without --pipeline_type_check option. This will make sure # the typehints associated with all transforms will have default values and # therefore any custom coders will not be used. The default coder (pickler) # will be used instead. - temp_path = self.create_temp_file(self.SAMPLE_RECORDS) - group_with_coder.run([ - '--no_pipeline_type_check', - '--input=%s*' % temp_path, - '--output=%s.result' % temp_path]) - # Parse result file and compare. - results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: - for line in result_file: - name, points = line.split(',') - results.append((name, int(points))) - logging.info('result: %s', results) - self.assertEqual( - sorted(results), - sorted([('ann', 15), ('fred', 9), ('joe', 60), ('mary', 8)])) + p = beam.Pipeline('DirectPipelineRunner') + + results_1 = (p + | 'create_sample_1' >> beam.Create([self.SAMPLE_RECORDS]) + | beam.Map(group_with_coder.get_players) + | beam.CombinePerKey(sum) + | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))) + + beam.assert_that(results_1, + beam.equal_to([('x:ann', 15), + ('x:fred', 9), + ('x:joe', 60), + ('x:mary', 8)]), + label='assert:tag_basics_without_type_check') if __name__ == '__main__':