From 64fc974d941aae32b5dd7f1bf6943d2e33dd1f41 Mon Sep 17 00:00:00 2001 From: Charles Chen Date: Mon, 5 Jun 2017 16:31:13 -0700 Subject: [PATCH 1/3] Migrate Python tests to not depend on fix sharding for file output --- .../complete/juliaset/juliaset/juliaset_test.py | 5 +++-- .../apache_beam/examples/complete/tfidf_test.py | 6 ++++-- .../examples/cookbook/group_with_coder_test.py | 5 +++-- .../examples/cookbook/mergecontacts_test.py | 5 ++++- .../cookbook/multiple_output_pardo_test.py | 9 +++++---- .../examples/wordcount_debugging_test.py | 3 ++- .../examples/wordcount_minimal_test.py | 3 ++- .../python/apache_beam/examples/wordcount_test.py | 3 ++- sdks/python/apache_beam/testing/util.py | 15 +++++++++++++++ 9 files changed, 40 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py index 17d9cf32798b..c19525150951 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py @@ -25,6 +25,7 @@ from apache_beam.examples.complete.juliaset.juliaset import juliaset +from apache_beam.testing.util import open_shards class JuliaSetTest(unittest.TestCase): @@ -60,8 +61,8 @@ def test_output_file_format(self): # Parse the results from the file, and ensure it was written in the proper # format. - with open(self.test_files['output_coord_file_name'] + - '-00000-of-00001') as result_file: + with open_shards(self.test_files['output_coord_file_name'] + + '-*-of-*') as result_file: output_lines = result_file.readlines() # Should have a line for each x-coordinate. diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py index 322426fd2b3d..e43c4989789a 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py @@ -28,6 +28,7 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.testing.util import open_shards EXPECTED_RESULTS = set([ @@ -76,8 +77,9 @@ def test_basics(self): '--output', os.path.join(temp_folder, 'result')]) # Parse result file and compare. results = [] - with open(os.path.join(temp_folder, - 'result-00000-of-00001')) as result_file: + with open_shards(os.path.join( + temp_folder, + 'result-*-of-*')) as result_file: for line in result_file: match = re.search(EXPECTED_LINE_RE, line) logging.info('Result line: %s', line) diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py index 268ba8d355f2..fb630ba465f7 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py @@ -22,6 +22,7 @@ import unittest from apache_beam.examples.cookbook import group_with_coder +from apache_beam.testing.util import open_shards # Patch group_with_coder.PlayerCoder.decode(). To test that the PlayerCoder was @@ -53,7 +54,7 @@ def test_basics_with_type_check(self): '--output=%s.result' % temp_path]) # Parse result file and compare. results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: + with open_shards(temp_path + '.result-*-of-*') as result_file: for line in result_file: name, points = line.split(',') results.append((name, int(points))) @@ -74,7 +75,7 @@ def test_basics_without_type_check(self): '--output=%s.result' % temp_path]) # Parse result file and compare. results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: + with open_shards(temp_path + '.result-*-of-*') as result_file: for line in result_file: name, points = line.split(',') results.append((name, int(points))) diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py index b3be0ddb0853..7f9745d91d77 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py @@ -22,6 +22,7 @@ import unittest from apache_beam.examples.cookbook import mergecontacts +from apache_beam.testing.util import open_shards class MergeContactsTest(unittest.TestCase): @@ -92,6 +93,8 @@ def normalize_tsv_results(self, tsv_data): lines_in = tsv_data.strip().split('\n') lines_out = [] for line in lines_in: + if not line: + continue name, email, phone, snailmail = line.split('\t') lines_out.append('\t'.join( [name, @@ -114,7 +117,7 @@ def test_mergecontacts(self): '--output_tsv=%s.tsv' % result_prefix, '--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3)) - with open('%s.tsv-00000-of-00001' % result_prefix) as f: + with open_shards('%s.tsv-*-of-*' % result_prefix) as f: contents = f.read() self.assertEqual(self.EXPECTED_TSV, self.normalize_tsv_results(contents)) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 3ddd668599d0..508c4082cfe7 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -23,6 +23,7 @@ import unittest from apache_beam.examples.cookbook import multiple_output_pardo +from apache_beam.testing.util import open_shards class MultipleOutputParDo(unittest.TestCase): @@ -39,7 +40,7 @@ def create_temp_file(self, contents): def get_wordcount_results(self, temp_path): results = [] - with open(temp_path) as result_file: + with open_shards(temp_path) as result_file: for line in result_file: match = re.search(r'([A-Za-z]+): ([0-9]+)', line) if match is not None: @@ -55,15 +56,15 @@ def test_multiple_output_pardo(self): '--output=%s' % result_prefix]) expected_char_count = len(''.join(self.SAMPLE_TEXT.split('\n'))) - with open(result_prefix + '-chars-00000-of-00001') as f: + with open_shards(result_prefix + '-chars-*-of-*') as f: contents = f.read() self.assertEqual(expected_char_count, int(contents)) short_words = self.get_wordcount_results( - result_prefix + '-short-words-00000-of-00001') + result_prefix + '-short-words-*-of-*') self.assertEqual(sorted(short_words), sorted(self.EXPECTED_SHORT_WORDS)) - words = self.get_wordcount_results(result_prefix + '-words-00000-of-00001') + words = self.get_wordcount_results(result_prefix + '-words-*-of-*') self.assertEqual(sorted(words), sorted(self.EXPECTED_WORDS)) diff --git a/sdks/python/apache_beam/examples/wordcount_debugging_test.py b/sdks/python/apache_beam/examples/wordcount_debugging_test.py index 900a8e74bda5..92ee2402ad65 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging_test.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging_test.py @@ -23,6 +23,7 @@ import unittest from apache_beam.examples import wordcount_debugging +from apache_beam.testing.util import open_shards class WordCountTest(unittest.TestCase): @@ -36,7 +37,7 @@ def create_temp_file(self, contents): def get_results(self, temp_path): results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: + with open_shards(temp_path + '.result-*-of-*') as result_file: for line in result_file: match = re.search(r'([A-Za-z]+): ([0-9]+)', line) if match is not None: diff --git a/sdks/python/apache_beam/examples/wordcount_minimal_test.py b/sdks/python/apache_beam/examples/wordcount_minimal_test.py index 82bace48f975..5ee7b7803980 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal_test.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal_test.py @@ -24,6 +24,7 @@ import unittest from apache_beam.examples import wordcount_minimal +from apache_beam.testing.util import open_shards class WordCountMinimalTest(unittest.TestCase): @@ -46,7 +47,7 @@ def test_basics(self): '--output=%s.result' % temp_path]) # Parse result file and compare. results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: + with open_shards(temp_path + '.result-*-of-*') as result_file: for line in result_file: match = re.search(r'([a-z]+): ([0-9]+)', line) if match is not None: diff --git a/sdks/python/apache_beam/examples/wordcount_test.py b/sdks/python/apache_beam/examples/wordcount_test.py index 616540b9caae..9834ba53111a 100644 --- a/sdks/python/apache_beam/examples/wordcount_test.py +++ b/sdks/python/apache_beam/examples/wordcount_test.py @@ -24,6 +24,7 @@ import unittest from apache_beam.examples import wordcount +from apache_beam.testing.util import open_shards class WordCountTest(unittest.TestCase): @@ -45,7 +46,7 @@ def test_basics(self): '--output=%s.result' % temp_path]) # Parse result file and compare. results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: + with open_shards(temp_path + '.result-*-of-*') as result_file: for line in result_file: match = re.search(r'([a-z]+): ([0-9]+)', line) if match is not None: diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py index 60a6b21ca39b..8ec817bad2aa 100644 --- a/sdks/python/apache_beam/testing/util.py +++ b/sdks/python/apache_beam/testing/util.py @@ -19,6 +19,10 @@ from __future__ import absolute_import +import fileinput +import glob +import tempfile + from apache_beam import pvalue from apache_beam.transforms import window from apache_beam.transforms.core import Create @@ -32,6 +36,8 @@ 'assert_that', 'equal_to', 'is_empty', + # open_shards is internal and has no backwards compatibility guarantees. + 'open_shards', ] @@ -105,3 +111,12 @@ def default_label(self): return label actual | AssertThat() # pylint: disable=expression-not-assigned + + +def open_shards(glob_pattern): + """Returns a composite file of all shards matching the given glob pattern.""" + with tempfile.NamedTemporaryFile(delete=False) as f: + for shard in glob.glob(glob_pattern): + f.write(file(shard).read()) + concatenated_file_name = f.name + return file(concatenated_file_name, 'rb') From e04f7bf51d865d885b3db0ff3f1c65701c23d5e0 Mon Sep 17 00:00:00 2001 From: Charles Chen Date: Tue, 6 Jun 2017 00:01:11 -0700 Subject: [PATCH 2/3] Address reviewer comments --- .../apache_beam/examples/cookbook/mergecontacts_test.py | 2 -- .../examples/cookbook/multiple_output_pardo_test.py | 4 ++-- sdks/python/apache_beam/testing/util.py | 2 ++ 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py index 7f9745d91d77..32a3d518861f 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py @@ -93,8 +93,6 @@ def normalize_tsv_results(self, tsv_data): lines_in = tsv_data.strip().split('\n') lines_out = [] for line in lines_in: - if not line: - continue name, email, phone, snailmail = line.split('\t') lines_out.append('\t'.join( [name, diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 508c4082cfe7..1051106ea9b0 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -38,9 +38,9 @@ def create_temp_file(self, contents): f.write(contents) return f.name - def get_wordcount_results(self, temp_path): + def get_wordcount_results(self, result_path): results = [] - with open_shards(temp_path) as result_file: + with open_shards(result_path) as result_file: for line in result_file: match = re.search(r'([A-Za-z]+): ([0-9]+)', line) if match is not None: diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py index 8ec817bad2aa..382a58fcaba4 100644 --- a/sdks/python/apache_beam/testing/util.py +++ b/sdks/python/apache_beam/testing/util.py @@ -30,6 +30,7 @@ from apache_beam.transforms.core import WindowInto from apache_beam.transforms.util import CoGroupByKey from apache_beam.transforms.ptransform import PTransform +from apache_beam.utils.annotations import experimental __all__ = [ @@ -113,6 +114,7 @@ def default_label(self): actual | AssertThat() # pylint: disable=expression-not-assigned +@experimental() def open_shards(glob_pattern): """Returns a composite file of all shards matching the given glob pattern.""" with tempfile.NamedTemporaryFile(delete=False) as f: From 6c9d5c479a87e361e64c81531e3d360c2a3830f3 Mon Sep 17 00:00:00 2001 From: Charles Chen Date: Tue, 6 Jun 2017 13:49:00 -0700 Subject: [PATCH 3/3] Fix lint --- .../examples/complete/juliaset/juliaset/juliaset_test.py | 2 +- sdks/python/apache_beam/examples/complete/tfidf_test.py | 3 +-- sdks/python/apache_beam/testing/util.py | 1 - 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py index c19525150951..91c75aa57927 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py @@ -62,7 +62,7 @@ def test_output_file_format(self): # Parse the results from the file, and ensure it was written in the proper # format. with open_shards(self.test_files['output_coord_file_name'] + - '-*-of-*') as result_file: + '-*-of-*') as result_file: output_lines = result_file.readlines() # Should have a line for each x-coordinate. diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py index e43c4989789a..b6f88255887c 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py @@ -78,8 +78,7 @@ def test_basics(self): # Parse result file and compare. results = [] with open_shards(os.path.join( - temp_folder, - 'result-*-of-*')) as result_file: + temp_folder, 'result-*-of-*')) as result_file: for line in result_file: match = re.search(EXPECTED_LINE_RE, line) logging.info('Result line: %s', line) diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py index 382a58fcaba4..959f25f31873 100644 --- a/sdks/python/apache_beam/testing/util.py +++ b/sdks/python/apache_beam/testing/util.py @@ -19,7 +19,6 @@ from __future__ import absolute_import -import fileinput import glob import tempfile