From 78ef0040a0167b89652bd500420775c4926c9655 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 18 May 2017 17:22:25 -0700 Subject: [PATCH 1/4] Automatically convert examples to use with syntax. --- .../examples/complete/autocomplete.py | 19 +- .../examples/complete/autocomplete_test.py | 31 +- .../examples/complete/estimate_pi.py | 10 +- .../examples/complete/estimate_pi_test.py | 11 +- .../apache_beam/examples/complete/tfidf.py | 21 +- .../examples/complete/tfidf_test.py | 27 +- .../complete/top_wikipedia_sessions.py | 11 +- .../complete/top_wikipedia_sessions_test.py | 9 +- .../examples/cookbook/bigquery_schema.py | 159 ++++--- .../examples/cookbook/bigquery_side_input.py | 51 +- .../cookbook/bigquery_side_input_test.py | 37 +- .../examples/cookbook/bigquery_tornadoes.py | 33 +- .../cookbook/bigquery_tornadoes_test.py | 19 +- .../examples/cookbook/coders_test.py | 13 +- .../examples/cookbook/custom_ptransform.py | 27 +- .../cookbook/custom_ptransform_test.py | 11 +- .../examples/cookbook/datastore_wordcount.py | 19 +- .../apache_beam/examples/cookbook/filters.py | 20 +- .../apache_beam/examples/snippets/snippets.py | 444 +++++++++--------- .../examples/snippets/snippets_test.py | 323 +++++++------ .../apache_beam/examples/streaming_wordcap.py | 23 +- .../examples/streaming_wordcount.py | 45 +- sdks/python/apache_beam/examples/wordcount.py | 1 - .../examples/wordcount_debugging.py | 56 ++- .../apache_beam/examples/wordcount_minimal.py | 32 +- .../apache_beam/io/filebasedsink_test.py | 15 +- sdks/python/apache_beam/pipeline.py | 19 +- .../apache_beam/transforms/combiners_test.py | 57 +-- .../apache_beam/transforms/window_test.py | 147 +++--- .../transforms/write_ptransform_test.py | 7 +- .../typehints/typed_pipeline_test.py | 22 +- 31 files changed, 823 insertions(+), 896 deletions(-) diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py index f0acc3fc562d..ab3397cfe335 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete.py @@ -44,16 +44,15 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - (p # pylint: disable=expression-not-assigned - | 'read' >> ReadFromText(known_args.input) - | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - | 'TopPerPrefix' >> TopPerPrefix(5) - | 'format' >> beam.Map( - lambda (prefix, candidates): '%s: %s' % (prefix, candidates)) - | 'write' >> WriteToText(known_args.output)) - p.run() + with beam.Pipeline(options=pipeline_options) as p: + + (p # pylint: disable=expression-not-assigned + | 'read' >> ReadFromText(known_args.input) + | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + | 'TopPerPrefix' >> TopPerPrefix(5) + | 'format' >> beam.Map( + lambda (prefix, candidates): '%s: %s' % (prefix, candidates)) + | 'write' >> WriteToText(known_args.output)) class TopPerPrefix(beam.PTransform): diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py index 378d222bfa8d..e2c84d68d3d4 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py @@ -31,22 +31,21 @@ class AutocompleteTest(unittest.TestCase): WORDS = ['this', 'this', 'that', 'to', 'to', 'to'] def test_top_prefixes(self): - p = TestPipeline() - words = p | beam.Create(self.WORDS) - result = words | autocomplete.TopPerPrefix(5) - # values must be hashable for now - result = result | beam.Map(lambda (k, vs): (k, tuple(vs))) - assert_that(result, equal_to( - [ - ('t', ((3, 'to'), (2, 'this'), (1, 'that'))), - ('to', ((3, 'to'), )), - ('th', ((2, 'this'), (1, 'that'))), - ('thi', ((2, 'this'), )), - ('this', ((2, 'this'), )), - ('tha', ((1, 'that'), )), - ('that', ((1, 'that'), )), - ])) - p.run() + with TestPipeline() as p: + words = p | beam.Create(self.WORDS) + result = words | autocomplete.TopPerPrefix(5) + # values must be hashable for now + result = result | beam.Map(lambda (k, vs): (k, tuple(vs))) + assert_that(result, equal_to( + [ + ('t', ((3, 'to'), (2, 'this'), (1, 'that'))), + ('to', ((3, 'to'), )), + ('th', ((2, 'this'), (1, 'that'))), + ('thi', ((2, 'this'), )), + ('this', ((2, 'this'), )), + ('tha', ((1, 'that'), )), + ('that', ((1, 'that'), )), + ])) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index c709713bd97e..8be44fff9b56 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -113,14 +113,12 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) + with beam.Pipeline(options=pipeline_options) as p: - (p # pylint: disable=expression-not-assigned - | EstimatePiTransform() - | WriteToText(known_args.output, coder=JsonCoder())) + (p # pylint: disable=expression-not-assigned + | EstimatePiTransform() + | WriteToText(known_args.output, coder=JsonCoder())) - # Actually run the pipeline (all operations above are deferred). - p.run() if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py index fd5130966887..0e1bc2595d55 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py @@ -38,13 +38,12 @@ def _in_between(actual): class EstimatePiTest(unittest.TestCase): def test_basics(self): - p = TestPipeline() - result = p | 'Estimate' >> estimate_pi.EstimatePiTransform(5000) + with TestPipeline() as p: + result = p | 'Estimate' >> estimate_pi.EstimatePiTransform(5000) - # Note: Probabilistically speaking this test can fail with a probability - # that is very small (VERY) given that we run at least 500 thousand trials. - assert_that(result, in_between(3.125, 3.155)) - p.run() + # Note: Probabilistically speaking this test can fail with a probability + # that is very small (VERY) given that we run at least 500 thousand trials. + assert_that(result, in_between(3.125, 3.155)) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index a98d90640d52..a88ff827766c 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -191,17 +191,16 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - # Read documents specified by the uris command line option. - pcoll = read_documents(p, glob.glob(known_args.uris)) - # Compute TF-IDF information for each word. - output = pcoll | TfIdf() - # Write the output using a "Write" transform that has side effects. - # pylint: disable=expression-not-assigned - output | 'write' >> WriteToText(known_args.output) - # Execute the pipeline and wait until it is completed. - p.run().wait_until_finish() + with beam.Pipeline(options=pipeline_options) as p: + + # Read documents specified by the uris command line option. + pcoll = read_documents(p, glob.glob(known_args.uris)) + # Compute TF-IDF information for each word. + output = pcoll | TfIdf() + # Write the output using a "Write" transform that has side effects. + # pylint: disable=expression-not-assigned + output | 'write' >> WriteToText(known_args.output) + # Execute the pipeline and wait until it is completed. if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py index f177dfc9e166..b1630f83d989 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py @@ -50,20 +50,19 @@ def create_file(self, path, contents): f.write(contents) def test_tfidf_transform(self): - p = TestPipeline() - uri_to_line = p | 'create sample' >> beam.Create( - [('1.txt', 'abc def ghi'), - ('2.txt', 'abc def'), - ('3.txt', 'abc')]) - result = ( - uri_to_line - | tfidf.TfIdf() - | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf))) - assert_that(result, equal_to(EXPECTED_RESULTS)) - # Run the pipeline. Note that the assert_that above adds to the pipeline - # a check that the result PCollection contains expected values. To actually - # trigger the check the pipeline must be run. - p.run() + with TestPipeline() as p: + uri_to_line = p | 'create sample' >> beam.Create( + [('1.txt', 'abc def ghi'), + ('2.txt', 'abc def'), + ('3.txt', 'abc')]) + result = ( + uri_to_line + | tfidf.TfIdf() + | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf))) + assert_that(result, equal_to(EXPECTED_RESULTS)) + # Run the pipeline. Note that the assert_that above adds to the pipeline + # a check that the result PCollection contains expected values. To actually + # trigger the check the pipeline must be run. def test_basics(self): # Setup the files with expected content. diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index aa48e4eb920e..aee51ccdd39b 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -159,14 +159,13 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) + with beam.Pipeline(options=pipeline_options) as p: - (p # pylint: disable=expression-not-assigned - | ReadFromText(known_args.input) - | ComputeTopSessions(known_args.sampling_threshold) - | WriteToText(known_args.output)) + (p # pylint: disable=expression-not-assigned + | ReadFromText(known_args.input) + | ComputeTopSessions(known_args.sampling_threshold) + | WriteToText(known_args.output)) - p.run() if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py index 5fb6276cb8d5..ced8a44af13c 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py @@ -52,12 +52,11 @@ class ComputeTopSessionsTest(unittest.TestCase): ] def test_compute_top_sessions(self): - p = TestPipeline() - edits = p | beam.Create(self.EDITS) - result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0) + with TestPipeline() as p: + edits = p | beam.Create(self.EDITS) + result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0) - assert_that(result, equal_to(self.EXPECTED)) - p.run() + assert_that(result, equal_to(self.EXPECTED)) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py index 400189e64424..3a8af67d1391 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py @@ -42,86 +42,85 @@ def run(argv=None): 'or DATASET.TABLE.')) known_args, pipeline_args = parser.parse_known_args(argv) - p = beam.Pipeline(argv=pipeline_args) - - from apache_beam.io.gcp.internal.clients import bigquery # pylint: disable=wrong-import-order, wrong-import-position - - table_schema = bigquery.TableSchema() - - # Fields that use standard types. - kind_schema = bigquery.TableFieldSchema() - kind_schema.name = 'kind' - kind_schema.type = 'string' - kind_schema.mode = 'nullable' - table_schema.fields.append(kind_schema) - - full_name_schema = bigquery.TableFieldSchema() - full_name_schema.name = 'fullName' - full_name_schema.type = 'string' - full_name_schema.mode = 'required' - table_schema.fields.append(full_name_schema) - - age_schema = bigquery.TableFieldSchema() - age_schema.name = 'age' - age_schema.type = 'integer' - age_schema.mode = 'nullable' - table_schema.fields.append(age_schema) - - gender_schema = bigquery.TableFieldSchema() - gender_schema.name = 'gender' - gender_schema.type = 'string' - gender_schema.mode = 'nullable' - table_schema.fields.append(gender_schema) - - # A nested field - phone_number_schema = bigquery.TableFieldSchema() - phone_number_schema.name = 'phoneNumber' - phone_number_schema.type = 'record' - phone_number_schema.mode = 'nullable' - - area_code = bigquery.TableFieldSchema() - area_code.name = 'areaCode' - area_code.type = 'integer' - area_code.mode = 'nullable' - phone_number_schema.fields.append(area_code) - - number = bigquery.TableFieldSchema() - number.name = 'number' - number.type = 'integer' - number.mode = 'nullable' - phone_number_schema.fields.append(number) - table_schema.fields.append(phone_number_schema) - - # A repeated field. - children_schema = bigquery.TableFieldSchema() - children_schema.name = 'children' - children_schema.type = 'string' - children_schema.mode = 'repeated' - table_schema.fields.append(children_schema) - - def create_random_record(record_id): - return {'kind': 'kind' + record_id, 'fullName': 'fullName'+record_id, - 'age': int(record_id) * 10, 'gender': 'male', - 'phoneNumber': { - 'areaCode': int(record_id) * 100, - 'number': int(record_id) * 100000}, - 'children': ['child' + record_id + '1', - 'child' + record_id + '2', - 'child' + record_id + '3'] - } - - # pylint: disable=expression-not-assigned - record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5']) - records = record_ids | 'CreateRecords' >> beam.Map(create_random_record) - records | 'write' >> beam.io.Write( - beam.io.BigQuerySink( - known_args.output, - schema=table_schema, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) - - # Run the pipeline (all operations are deferred until run() is called). - p.run() + with beam.Pipeline(argv=pipeline_args) as p: + + from apache_beam.io.gcp.internal.clients import bigquery # pylint: disable=wrong-import-order, wrong-import-position + + table_schema = bigquery.TableSchema() + + # Fields that use standard types. + kind_schema = bigquery.TableFieldSchema() + kind_schema.name = 'kind' + kind_schema.type = 'string' + kind_schema.mode = 'nullable' + table_schema.fields.append(kind_schema) + + full_name_schema = bigquery.TableFieldSchema() + full_name_schema.name = 'fullName' + full_name_schema.type = 'string' + full_name_schema.mode = 'required' + table_schema.fields.append(full_name_schema) + + age_schema = bigquery.TableFieldSchema() + age_schema.name = 'age' + age_schema.type = 'integer' + age_schema.mode = 'nullable' + table_schema.fields.append(age_schema) + + gender_schema = bigquery.TableFieldSchema() + gender_schema.name = 'gender' + gender_schema.type = 'string' + gender_schema.mode = 'nullable' + table_schema.fields.append(gender_schema) + + # A nested field + phone_number_schema = bigquery.TableFieldSchema() + phone_number_schema.name = 'phoneNumber' + phone_number_schema.type = 'record' + phone_number_schema.mode = 'nullable' + + area_code = bigquery.TableFieldSchema() + area_code.name = 'areaCode' + area_code.type = 'integer' + area_code.mode = 'nullable' + phone_number_schema.fields.append(area_code) + + number = bigquery.TableFieldSchema() + number.name = 'number' + number.type = 'integer' + number.mode = 'nullable' + phone_number_schema.fields.append(number) + table_schema.fields.append(phone_number_schema) + + # A repeated field. + children_schema = bigquery.TableFieldSchema() + children_schema.name = 'children' + children_schema.type = 'string' + children_schema.mode = 'repeated' + table_schema.fields.append(children_schema) + + def create_random_record(record_id): + return {'kind': 'kind' + record_id, 'fullName': 'fullName'+record_id, + 'age': int(record_id) * 10, 'gender': 'male', + 'phoneNumber': { + 'areaCode': int(record_id) * 100, + 'number': int(record_id) * 100000}, + 'children': ['child' + record_id + '1', + 'child' + record_id + '2', + 'child' + record_id + '3'] + } + + # pylint: disable=expression-not-assigned + record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5']) + records = record_ids | 'CreateRecords' >> beam.Map(create_random_record) + records | 'write' >> beam.io.Write( + beam.io.BigQuerySink( + known_args.output, + schema=table_schema, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) + + # Run the pipeline (all operations are deferred until run() is called). if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py index 6b2881828a0d..9911a6716bd8 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py @@ -88,32 +88,31 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - group_ids = [] - for i in xrange(0, int(known_args.num_groups)): - group_ids.append('id' + str(i)) - - query_corpus = 'select UNIQUE(corpus) from publicdata:samples.shakespeare' - query_word = 'select UNIQUE(word) from publicdata:samples.shakespeare' - ignore_corpus = known_args.ignore_corpus - ignore_word = known_args.ignore_word - - pcoll_corpus = p | 'read corpus' >> beam.io.Read( - beam.io.BigQuerySource(query=query_corpus)) - pcoll_word = p | 'read_words' >> beam.io.Read( - beam.io.BigQuerySource(query=query_word)) - pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create( - [ignore_corpus]) - pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word]) - pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids) - - pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word, - pcoll_ignore_corpus, pcoll_ignore_word) - - # pylint:disable=expression-not-assigned - pcoll_groups | WriteToText(known_args.output) - p.run() + with beam.Pipeline(options=pipeline_options) as p: + + group_ids = [] + for i in xrange(0, int(known_args.num_groups)): + group_ids.append('id' + str(i)) + + query_corpus = 'select UNIQUE(corpus) from publicdata:samples.shakespeare' + query_word = 'select UNIQUE(word) from publicdata:samples.shakespeare' + ignore_corpus = known_args.ignore_corpus + ignore_word = known_args.ignore_word + + pcoll_corpus = p | 'read corpus' >> beam.io.Read( + beam.io.BigQuerySource(query=query_corpus)) + pcoll_word = p | 'read_words' >> beam.io.Read( + beam.io.BigQuerySource(query=query_word)) + pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create( + [ignore_corpus]) + pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word]) + pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids) + + pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word, + pcoll_ignore_corpus, pcoll_ignore_word) + + # pylint:disable=expression-not-assigned + pcoll_groups | WriteToText(known_args.output) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py index b11dc47c3363..5d6705deb0a3 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py @@ -30,25 +30,24 @@ class BigQuerySideInputTest(unittest.TestCase): def test_create_groups(self): - p = TestPipeline() - - group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C']) - corpus_pcoll = p | 'CreateCorpus' >> beam.Create( - [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}]) - words_pcoll = p | 'CreateWords' >> beam.Create( - [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}]) - ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1']) - ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1']) - - groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll, - words_pcoll, ignore_corpus_pcoll, - ignore_word_pcoll) - - assert_that(groups, equal_to( - [('A', 'corpus2', 'word2'), - ('B', 'corpus2', 'word2'), - ('C', 'corpus2', 'word2')])) - p.run() + with TestPipeline() as p: + + group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C']) + corpus_pcoll = p | 'CreateCorpus' >> beam.Create( + [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}]) + words_pcoll = p | 'CreateWords' >> beam.Create( + [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}]) + ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1']) + ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1']) + + groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll, + words_pcoll, ignore_corpus_pcoll, + ignore_word_pcoll) + + assert_that(groups, equal_to( + [('A', 'corpus2', 'word2'), + ('B', 'corpus2', 'word2'), + ('C', 'corpus2', 'word2')])) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py index ed0c79a71a15..d3b216e6dd3a 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py @@ -75,23 +75,22 @@ def run(argv=None): 'or DATASET.TABLE.')) known_args, pipeline_args = parser.parse_known_args(argv) - p = beam.Pipeline(argv=pipeline_args) - - # Read the table rows into a PCollection. - rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input)) - counts = count_tornadoes(rows) - - # Write the output using a "Write" transform that has side effects. - # pylint: disable=expression-not-assigned - counts | 'write' >> beam.io.Write( - beam.io.BigQuerySink( - known_args.output, - schema='month:INTEGER, tornado_count:INTEGER', - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) - - # Run the pipeline (all operations are deferred until run() is called). - p.run().wait_until_finish() + with beam.Pipeline(argv=pipeline_args) as p: + + # Read the table rows into a PCollection. + rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input)) + counts = count_tornadoes(rows) + + # Write the output using a "Write" transform that has side effects. + # pylint: disable=expression-not-assigned + counts | 'write' >> beam.io.Write( + beam.io.BigQuerySink( + known_args.output, + schema='month:INTEGER, tornado_count:INTEGER', + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) + + # Run the pipeline (all operations are deferred until run() is called). if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py index c926df8ec15c..45dcabaf853c 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py @@ -30,16 +30,15 @@ class BigQueryTornadoesTest(unittest.TestCase): def test_basics(self): - p = TestPipeline() - rows = (p | 'create' >> beam.Create([ - {'month': 1, 'day': 1, 'tornado': False}, - {'month': 1, 'day': 2, 'tornado': True}, - {'month': 1, 'day': 3, 'tornado': True}, - {'month': 2, 'day': 1, 'tornado': True}])) - results = bigquery_tornadoes.count_tornadoes(rows) - assert_that(results, equal_to([{'month': 1, 'tornado_count': 2}, - {'month': 2, 'tornado_count': 1}])) - p.run().wait_until_finish() + with TestPipeline() as p: + rows = (p | 'create' >> beam.Create([ + {'month': 1, 'day': 1, 'tornado': False}, + {'month': 1, 'day': 2, 'tornado': True}, + {'month': 1, 'day': 3, 'tornado': True}, + {'month': 2, 'day': 1, 'tornado': True}])) + results = bigquery_tornadoes.count_tornadoes(rows) + assert_that(results, equal_to([{'month': 1, 'tornado_count': 2}, + {'month': 2, 'tornado_count': 1}])) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py index f71dad8ed80f..898461e5e62f 100644 --- a/sdks/python/apache_beam/examples/cookbook/coders_test.py +++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py @@ -35,13 +35,12 @@ class CodersTest(unittest.TestCase): {'host': ['Brasil', 1], 'guest': ['Italy', 0]}] def test_compute_points(self): - p = TestPipeline() - records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS) - result = (records - | 'points' >> beam.FlatMap(coders.compute_points) - | beam.CombinePerKey(sum)) - assert_that(result, equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)])) - p.run() + with TestPipeline() as p: + records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS) + result = (records + | 'points' >> beam.FlatMap(coders.compute_points) + | beam.CombinePerKey(sum)) + assert_that(result, equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)])) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py index 609f2cd87181..aee69d23ff65 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py @@ -47,11 +47,10 @@ def expand(self, pcoll): def run_count1(known_args, options): """Runs the first example pipeline.""" logging.info('Running first pipeline') - p = beam.Pipeline(options=options) - (p | beam.io.ReadFromText(known_args.input) - | Count1() - | beam.io.WriteToText(known_args.output)) - p.run().wait_until_finish() + with beam.Pipeline(options=options) as p: + (p | beam.io.ReadFromText(known_args.input) + | Count1() + | beam.io.WriteToText(known_args.output)) @beam.ptransform_fn @@ -66,11 +65,10 @@ def Count2(pcoll): # pylint: disable=invalid-name def run_count2(known_args, options): """Runs the second example pipeline.""" logging.info('Running second pipeline') - p = beam.Pipeline(options=options) - (p | ReadFromText(known_args.input) - | Count2() # pylint: disable=no-value-for-parameter - | WriteToText(known_args.output)) - p.run().wait_until_finish() + with beam.Pipeline(options=options) as p: + (p | ReadFromText(known_args.input) + | Count2() # pylint: disable=no-value-for-parameter + | WriteToText(known_args.output)) @beam.ptransform_fn @@ -93,11 +91,10 @@ def Count3(pcoll, factor=1): # pylint: disable=invalid-name def run_count3(known_args, options): """Runs the third example pipeline.""" logging.info('Running third pipeline') - p = beam.Pipeline(options=options) - (p | ReadFromText(known_args.input) - | Count3(2) # pylint: disable=no-value-for-parameter - | WriteToText(known_args.output)) - p.run() + with beam.Pipeline(options=options) as p: + (p | ReadFromText(known_args.input) + | Count3(2) # pylint: disable=no-value-for-parameter + | WriteToText(known_args.output)) def get_args(argv): diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py index c7c6dbabd5e0..7aaccb4ac1c0 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py @@ -40,12 +40,11 @@ def test_count3(self): self.run_pipeline(custom_ptransform.Count3(factor), factor=factor) def run_pipeline(self, count_implementation, factor=1): - p = TestPipeline() - words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG']) - result = words | count_implementation - assert_that( - result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))])) - p.run() + with TestPipeline() as p: + words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG']) + result = words | count_implementation + assert_that( + result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))])) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index 411feb8058a7..7281662fd04b 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -135,18 +135,16 @@ def make_entity(self, content): def write_to_datastore(project, user_options, pipeline_options): """Creates a pipeline that writes entities to Cloud Datastore.""" - p = beam.Pipeline(options=pipeline_options) + with beam.Pipeline(options=pipeline_options) as p: - # pylint: disable=expression-not-assigned - (p - | 'read' >> ReadFromText(user_options.input) - | 'create entity' >> beam.Map( - EntityWrapper(user_options.namespace, user_options.kind, - user_options.ancestor).make_entity) - | 'write to datastore' >> WriteToDatastore(project)) + # pylint: disable=expression-not-assigned + (p + | 'read' >> ReadFromText(user_options.input) + | 'create entity' >> beam.Map( + EntityWrapper(user_options.namespace, user_options.kind, + user_options.ancestor).make_entity) + | 'write to datastore' >> WriteToDatastore(project)) - # Actually run the pipeline (all operations above are deferred). - p.run().wait_until_finish() def make_ancestor_query(kind, namespace, ancestor): @@ -196,7 +194,6 @@ def read_from_datastore(project, user_options, pipeline_options): output | 'write' >> beam.io.WriteToText(file_path_prefix=user_options.output, num_shards=user_options.num_shards) - # Actually run the pipeline (all operations above are deferred). result = p.run() # Wait until completion, main thread would access post-completion job results. result.wait_until_finish() diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py index 374001cd83fe..742d995ae008 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters.py +++ b/sdks/python/apache_beam/examples/cookbook/filters.py @@ -86,20 +86,18 @@ def run(argv=None): help='Numeric value of month to filter on.') known_args, pipeline_args = parser.parse_known_args(argv) - p = beam.Pipeline(argv=pipeline_args) + with beam.Pipeline(argv=pipeline_args) as p: - input_data = p | beam.io.Read(beam.io.BigQuerySource(known_args.input)) + input_data = p | beam.io.Read(beam.io.BigQuerySource(known_args.input)) - # pylint: disable=expression-not-assigned - (filter_cold_days(input_data, known_args.month_filter) - | 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink( - known_args.output, - schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT', - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) + # pylint: disable=expression-not-assigned + (filter_cold_days(input_data, known_args.month_filter) + | 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink( + known_args.output, + schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT', + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) - # Actually run the pipeline (all operations above are deferred). - p.run() if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 72595723de45..9f5960d9bc75 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -297,12 +297,11 @@ def pipeline_options_command_line(argv): known_args, pipeline_args = parser.parse_known_args(argv) # Create the Pipeline with remaining arguments. - p = beam.Pipeline(argv=pipeline_args) - lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input) - lines | 'WriteToText' >> beam.io.WriteToText(known_args.output) - # [END pipeline_options_command_line] + with beam.Pipeline(argv=pipeline_args) as p: + lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input) + lines | 'WriteToText' >> beam.io.WriteToText(known_args.output) + # [END pipeline_options_command_line] - p.run().wait_until_finish() def pipeline_logging(lines, output): @@ -329,13 +328,12 @@ def process(self, element): # Remaining WordCount example code ... # [END pipeline_logging] - p = TestPipeline() # Use TestPipeline for testing. - (p - | beam.Create(lines) - | beam.ParDo(ExtractWordsFn()) - | beam.io.WriteToText(output)) + with TestPipeline() as p: # Use TestPipeline for testing. + (p + | beam.Create(lines) + | beam.ParDo(ExtractWordsFn()) + | beam.io.WriteToText(output)) - p.run() def pipeline_monitoring(renames): @@ -385,20 +383,19 @@ def expand(self, pcoll): pipeline_options = PipelineOptions() options = pipeline_options.view_as(WordCountOptions) - p = TestPipeline() # Use TestPipeline for testing. + with TestPipeline() as p: # Use TestPipeline for testing. - # [START pipeline_monitoring_execution] - (p - # Read the lines of the input text. - | 'ReadLines' >> beam.io.ReadFromText(options.input) - # Count the words. - | CountWords() - # Write the formatted word counts to output. - | 'WriteCounts' >> beam.io.WriteToText(options.output)) - # [END pipeline_monitoring_execution] + # [START pipeline_monitoring_execution] + (p + # Read the lines of the input text. + | 'ReadLines' >> beam.io.ReadFromText(options.input) + # Count the words. + | CountWords() + # Write the formatted word counts to output. + | 'WriteCounts' >> beam.io.WriteToText(options.output)) + # [END pipeline_monitoring_execution] - p.visit(SnippetUtils.RenameFiles(renames)) - p.run() + p.visit(SnippetUtils.RenameFiles(renames)) def examples_wordcount_minimal(renames): @@ -478,40 +475,39 @@ def _add_argparse_args(cls, parser): default='gs://my-bucket/input') options = PipelineOptions(argv) - p = beam.Pipeline(options=options) - # [END examples_wordcount_wordcount_options] + with beam.Pipeline(options=options) as p: + # [END examples_wordcount_wordcount_options] - lines = p | beam.io.ReadFromText( - 'gs://dataflow-samples/shakespeare/kinglear.txt') + lines = p | beam.io.ReadFromText( + 'gs://dataflow-samples/shakespeare/kinglear.txt') - # [START examples_wordcount_wordcount_composite] - class CountWords(beam.PTransform): + # [START examples_wordcount_wordcount_composite] + class CountWords(beam.PTransform): - def expand(self, pcoll): - return (pcoll - # Convert lines of text into individual words. - | 'ExtractWords' >> beam.FlatMap( - lambda x: re.findall(r'[A-Za-z\']+', x)) + def expand(self, pcoll): + return (pcoll + # Convert lines of text into individual words. + | 'ExtractWords' >> beam.FlatMap( + lambda x: re.findall(r'[A-Za-z\']+', x)) - # Count the number of times each word occurs. - | beam.combiners.Count.PerElement()) + # Count the number of times each word occurs. + | beam.combiners.Count.PerElement()) - counts = lines | CountWords() - # [END examples_wordcount_wordcount_composite] + counts = lines | CountWords() + # [END examples_wordcount_wordcount_composite] - # [START examples_wordcount_wordcount_dofn] - class FormatAsTextFn(beam.DoFn): + # [START examples_wordcount_wordcount_dofn] + class FormatAsTextFn(beam.DoFn): - def process(self, element): - word, count = element - yield '%s: %s' % (word, count) + def process(self, element): + word, count = element + yield '%s: %s' % (word, count) - formatted = counts | beam.ParDo(FormatAsTextFn()) - # [END examples_wordcount_wordcount_dofn] + formatted = counts | beam.ParDo(FormatAsTextFn()) + # [END examples_wordcount_wordcount_dofn] - formatted | beam.io.WriteToText('gs://my-bucket/counts.txt') - p.visit(SnippetUtils.RenameFiles(renames)) - p.run().wait_until_finish() + formatted | beam.io.WriteToText('gs://my-bucket/counts.txt') + p.visit(SnippetUtils.RenameFiles(renames)) def examples_wordcount_debugging(renames): @@ -558,27 +554,26 @@ def process(self, element): # [END example_wordcount_debugging_logging] # [END example_wordcount_debugging_aggregators] - p = TestPipeline() # Use TestPipeline for testing. - filtered_words = ( - p - | beam.io.ReadFromText( - 'gs://dataflow-samples/shakespeare/kinglear.txt') - | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - | beam.combiners.Count.PerElement() - | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) + with TestPipeline() as p: # Use TestPipeline for testing. + filtered_words = ( + p + | beam.io.ReadFromText( + 'gs://dataflow-samples/shakespeare/kinglear.txt') + | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + | beam.combiners.Count.PerElement() + | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) - # [START example_wordcount_debugging_assert] - beam.testing.util.assert_that( - filtered_words, beam.testing.util.equal_to( - [('Flourish', 3), ('stomach', 1)])) - # [END example_wordcount_debugging_assert] + # [START example_wordcount_debugging_assert] + beam.testing.util.assert_that( + filtered_words, beam.testing.util.equal_to( + [('Flourish', 3), ('stomach', 1)])) + # [END example_wordcount_debugging_assert] - output = (filtered_words - | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) - | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt')) + output = (filtered_words + | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) + | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt')) - p.visit(SnippetUtils.RenameFiles(renames)) - p.run() + p.visit(SnippetUtils.RenameFiles(renames)) import apache_beam as beam @@ -659,16 +654,15 @@ def model_custom_source(count): # Using the source in an example pipeline. # [START model_custom_source_use_new_source] - p = beam.Pipeline(options=PipelineOptions()) - numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count)) - # [END model_custom_source_use_new_source] + with beam.Pipeline(options=PipelineOptions()) as p: + numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count)) + # [END model_custom_source_use_new_source] - lines = numbers | beam.core.Map(lambda number: 'line %d' % number) - assert_that( - lines, equal_to( - ['line ' + str(number) for number in range(0, count)])) + lines = numbers | beam.core.Map(lambda number: 'line %d' % number) + assert_that( + lines, equal_to( + ['line ' + str(number) for number in range(0, count)])) - p.run().wait_until_finish() # We recommend users to start Source classes with an underscore to discourage # using the Source class directly when a PTransform for the source is @@ -796,14 +790,13 @@ def close(self): # Using the new sink in an example pipeline. # [START model_custom_sink_use_new_sink] - p = beam.Pipeline(options=PipelineOptions()) - kvs = p | 'CreateKVs' >> beam.Create(KVs) + with beam.Pipeline(options=PipelineOptions()) as p: + kvs = p | 'CreateKVs' >> beam.Create(KVs) - kvs | 'WriteToSimpleKV' >> beam.io.Write( - SimpleKVSink('http://url_to_simple_kv/', final_table_name)) - # [END model_custom_sink_use_new_sink] + kvs | 'WriteToSimpleKV' >> beam.io.Write( + SimpleKVSink('http://url_to_simple_kv/', final_table_name)) + # [END model_custom_sink_use_new_sink] - p.run().wait_until_finish() # We recommend users to start Sink class names with an underscore to # discourage using the Sink class directly when a PTransform for the sink is @@ -828,13 +821,12 @@ def expand(self, pcoll): final_table_name = final_table_name_with_ptransform # [START model_custom_sink_use_ptransform] - p = beam.Pipeline(options=PipelineOptions()) - kvs = p | 'CreateKVs' >> beam.core.Create(KVs) - kvs | 'WriteToSimpleKV' >> WriteToKVSink( - 'http://url_to_simple_kv/', final_table_name) - # [END model_custom_sink_use_ptransform] + with beam.Pipeline(options=PipelineOptions()) as p: + kvs = p | 'CreateKVs' >> beam.core.Create(KVs) + kvs | 'WriteToSimpleKV' >> WriteToKVSink( + 'http://url_to_simple_kv/', final_table_name) + # [END model_custom_sink_use_ptransform] - p.run().wait_until_finish() def model_textio(renames): @@ -847,37 +839,35 @@ def filter_words(x): from apache_beam.options.pipeline_options import PipelineOptions # [START model_textio_read] - p = beam.Pipeline(options=PipelineOptions()) - # [START model_pipelineio_read] - lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv') - # [END model_pipelineio_read] - # [END model_textio_read] - - # [START model_textio_write] - filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words) - # [START model_pipelineio_write] - filtered_words | 'WriteToText' >> beam.io.WriteToText( - '/path/to/numbers', file_name_suffix='.csv') - # [END model_pipelineio_write] - # [END model_textio_write] + with beam.Pipeline(options=PipelineOptions()) as p: + # [START model_pipelineio_read] + lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv') + # [END model_pipelineio_read] + # [END model_textio_read] - p.visit(SnippetUtils.RenameFiles(renames)) - p.run().wait_until_finish() + # [START model_textio_write] + filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words) + # [START model_pipelineio_write] + filtered_words | 'WriteToText' >> beam.io.WriteToText( + '/path/to/numbers', file_name_suffix='.csv') + # [END model_pipelineio_write] + # [END model_textio_write] + + p.visit(SnippetUtils.RenameFiles(renames)) def model_textio_compressed(renames, expected): """Using a Read Transform to read compressed text files.""" - p = TestPipeline() + with TestPipeline() as p: - # [START model_textio_write_compressed] - lines = p | 'ReadFromText' >> beam.io.ReadFromText( - '/path/to/input-*.csv.gz', - compression_type=beam.io.filesystem.CompressionTypes.GZIP) - # [END model_textio_write_compressed] + # [START model_textio_write_compressed] + lines = p | 'ReadFromText' >> beam.io.ReadFromText( + '/path/to/input-*.csv.gz', + compression_type=beam.io.filesystem.CompressionTypes.GZIP) + # [END model_textio_write_compressed] - assert_that(lines, equal_to(expected)) - p.visit(SnippetUtils.RenameFiles(renames)) - p.run().wait_until_finish() + assert_that(lines, equal_to(expected)) + p.visit(SnippetUtils.RenameFiles(renames)) def model_datastoreio(): @@ -987,43 +977,41 @@ def expand(self, pcoll): # [END composite_ptransform_apply_method] # [END composite_transform_example] - p = TestPipeline() # Use TestPipeline for testing. - (p - | beam.Create(contents) - | CountWords() - | beam.io.WriteToText(output_path)) - p.run() + with TestPipeline() as p: # Use TestPipeline for testing. + (p + | beam.Create(contents) + | CountWords() + | beam.io.WriteToText(output_path)) def model_multiple_pcollections_flatten(contents, output_path): """Merging a PCollection with Flatten.""" some_hash_fn = lambda s: ord(s[0]) import apache_beam as beam - p = TestPipeline() # Use TestPipeline for testing. - partition_fn = lambda element, partitions: some_hash_fn(element) % partitions - - # Partition into deciles - partitioned = p | beam.Create(contents) | beam.Partition(partition_fn, 3) - pcoll1 = partitioned[0] - pcoll2 = partitioned[1] - pcoll3 = partitioned[2] - - # Flatten them back into 1 - - # A collection of PCollection objects can be represented simply - # as a tuple (or list) of PCollections. - # (The SDK for Python has no separate type to store multiple - # PCollection objects, whether containing the same or different - # types.) - # [START model_multiple_pcollections_flatten] - merged = ( - (pcoll1, pcoll2, pcoll3) - # A list of tuples can be "piped" directly into a Flatten transform. - | beam.Flatten()) - # [END model_multiple_pcollections_flatten] - merged | beam.io.WriteToText(output_path) + with TestPipeline() as p: # Use TestPipeline for testing. + partition_fn = lambda element, partitions: some_hash_fn(element) % partitions + + # Partition into deciles + partitioned = p | beam.Create(contents) | beam.Partition(partition_fn, 3) + pcoll1 = partitioned[0] + pcoll2 = partitioned[1] + pcoll3 = partitioned[2] + + # Flatten them back into 1 + + # A collection of PCollection objects can be represented simply + # as a tuple (or list) of PCollections. + # (The SDK for Python has no separate type to store multiple + # PCollection objects, whether containing the same or different + # types.) + # [START model_multiple_pcollections_flatten] + merged = ( + (pcoll1, pcoll2, pcoll3) + # A list of tuples can be "piped" directly into a Flatten transform. + | beam.Flatten()) + # [END model_multiple_pcollections_flatten] + merged | beam.io.WriteToText(output_path) - p.run() def model_multiple_pcollections_partition(contents, output_path): @@ -1034,25 +1022,24 @@ def get_percentile(i): """Assume i in [0,100).""" return i import apache_beam as beam - p = TestPipeline() # Use TestPipeline for testing. + with TestPipeline() as p: # Use TestPipeline for testing. - students = p | beam.Create(contents) + students = p | beam.Create(contents) - # [START model_multiple_pcollections_partition] - def partition_fn(student, num_partitions): - return int(get_percentile(student) * num_partitions / 100) + # [START model_multiple_pcollections_partition] + def partition_fn(student, num_partitions): + return int(get_percentile(student) * num_partitions / 100) - by_decile = students | beam.Partition(partition_fn, 10) - # [END model_multiple_pcollections_partition] - # [START model_multiple_pcollections_partition_40th] - fortieth_percentile = by_decile[4] - # [END model_multiple_pcollections_partition_40th] + by_decile = students | beam.Partition(partition_fn, 10) + # [END model_multiple_pcollections_partition] + # [START model_multiple_pcollections_partition_40th] + fortieth_percentile = by_decile[4] + # [END model_multiple_pcollections_partition_40th] - ([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile] - | beam.Flatten() - | beam.io.WriteToText(output_path)) + ([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile] + | beam.Flatten() + | beam.io.WriteToText(output_path)) - p.run() def model_group_by_key(contents, output_path): @@ -1060,58 +1047,56 @@ def model_group_by_key(contents, output_path): import re import apache_beam as beam - p = TestPipeline() # Use TestPipeline for testing. - words_and_counts = ( - p - | beam.Create(contents) - | beam.FlatMap(lambda x: re.findall(r'\w+', x)) - | 'one word' >> beam.Map(lambda w: (w, 1))) - # GroupByKey accepts a PCollection of (w, 1) and - # outputs a PCollection of (w, (1, 1, ...)). - # (A key/value pair is just a tuple in Python.) - # This is a somewhat forced example, since one could - # simply use beam.combiners.Count.PerElement here. - # [START model_group_by_key_transform] - grouped_words = words_and_counts | beam.GroupByKey() - # [END model_group_by_key_transform] - (grouped_words - | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts))) - | beam.io.WriteToText(output_path)) - p.run() + with TestPipeline() as p: # Use TestPipeline for testing. + words_and_counts = ( + p + | beam.Create(contents) + | beam.FlatMap(lambda x: re.findall(r'\w+', x)) + | 'one word' >> beam.Map(lambda w: (w, 1))) + # GroupByKey accepts a PCollection of (w, 1) and + # outputs a PCollection of (w, (1, 1, ...)). + # (A key/value pair is just a tuple in Python.) + # This is a somewhat forced example, since one could + # simply use beam.combiners.Count.PerElement here. + # [START model_group_by_key_transform] + grouped_words = words_and_counts | beam.GroupByKey() + # [END model_group_by_key_transform] + (grouped_words + | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts))) + | beam.io.WriteToText(output_path)) def model_co_group_by_key_tuple(email_list, phone_list, output_path): """Applying a CoGroupByKey Transform to a tuple.""" import apache_beam as beam - p = TestPipeline() # Use TestPipeline for testing. - # [START model_group_by_key_cogroupbykey_tuple] - # Each data set is represented by key-value pairs in separate PCollections. - # Both data sets share a common key type (in this example str). - # The email_list contains values such as: ('joe', 'joe@example.com') with - # multiple possible values for each key. - # The phone_list contains values such as: ('mary': '111-222-3333') with - # multiple possible values for each key. - emails = p | 'email' >> beam.Create(email_list) - phones = p | 'phone' >> beam.Create(phone_list) - # The result PCollection contains one key-value element for each key in the - # input PCollections. The key of the pair will be the key from the input and - # the value will be a dictionary with two entries: 'emails' - an iterable of - # all values for the current key in the emails PCollection and 'phones': an - # iterable of all values for the current key in the phones PCollection. - # For instance, if 'emails' contained ('joe', 'joe@example.com') and - # ('joe', 'joe@gmail.com'), then 'result' will contain the element - # ('joe', {'emails': ['joe@example.com', 'joe@gmail.com'], 'phones': ...}) - result = {'emails': emails, 'phones': phones} | beam.CoGroupByKey() - - def join_info((name, info)): - return '; '.join(['%s' % name, - '%s' % ','.join(info['emails']), - '%s' % ','.join(info['phones'])]) - - contact_lines = result | beam.Map(join_info) - # [END model_group_by_key_cogroupbykey_tuple] - contact_lines | beam.io.WriteToText(output_path) - p.run() + with TestPipeline() as p: # Use TestPipeline for testing. + # [START model_group_by_key_cogroupbykey_tuple] + # Each data set is represented by key-value pairs in separate PCollections. + # Both data sets share a common key type (in this example str). + # The email_list contains values such as: ('joe', 'joe@example.com') with + # multiple possible values for each key. + # The phone_list contains values such as: ('mary': '111-222-3333') with + # multiple possible values for each key. + emails = p | 'email' >> beam.Create(email_list) + phones = p | 'phone' >> beam.Create(phone_list) + # The result PCollection contains one key-value element for each key in the + # input PCollections. The key of the pair will be the key from the input and + # the value will be a dictionary with two entries: 'emails' - an iterable of + # all values for the current key in the emails PCollection and 'phones': an + # iterable of all values for the current key in the phones PCollection. + # For instance, if 'emails' contained ('joe', 'joe@example.com') and + # ('joe', 'joe@gmail.com'), then 'result' will contain the element + # ('joe', {'emails': ['joe@example.com', 'joe@gmail.com'], 'phones': ...}) + result = {'emails': emails, 'phones': phones} | beam.CoGroupByKey() + + def join_info((name, info)): + return '; '.join(['%s' % name, + '%s' % ','.join(info['emails']), + '%s' % ','.join(info['phones'])]) + + contact_lines = result | beam.Map(join_info) + # [END model_group_by_key_cogroupbykey_tuple] + contact_lines | beam.io.WriteToText(output_path) def model_join_using_side_inputs( @@ -1121,35 +1106,34 @@ def model_join_using_side_inputs( import apache_beam as beam from apache_beam.pvalue import AsIter - p = TestPipeline() # Use TestPipeline for testing. - # [START model_join_using_side_inputs] - # This code performs a join by receiving the set of names as an input and - # passing PCollections that contain emails and phone numbers as side inputs - # instead of using CoGroupByKey. - names = p | 'names' >> beam.Create(name_list) - emails = p | 'email' >> beam.Create(email_list) - phones = p | 'phone' >> beam.Create(phone_list) - - def join_info(name, emails, phone_numbers): - filtered_emails = [] - for name_in_list, email in emails: - if name_in_list == name: - filtered_emails.append(email) - - filtered_phone_numbers = [] - for name_in_list, phone_number in phone_numbers: - if name_in_list == name: - filtered_phone_numbers.append(phone_number) - - return '; '.join(['%s' % name, - '%s' % ','.join(filtered_emails), - '%s' % ','.join(filtered_phone_numbers)]) - - contact_lines = names | 'CreateContacts' >> beam.core.Map( - join_info, AsIter(emails), AsIter(phones)) - # [END model_join_using_side_inputs] - contact_lines | beam.io.WriteToText(output_path) - p.run() + with TestPipeline() as p: # Use TestPipeline for testing. + # [START model_join_using_side_inputs] + # This code performs a join by receiving the set of names as an input and + # passing PCollections that contain emails and phone numbers as side inputs + # instead of using CoGroupByKey. + names = p | 'names' >> beam.Create(name_list) + emails = p | 'email' >> beam.Create(email_list) + phones = p | 'phone' >> beam.Create(phone_list) + + def join_info(name, emails, phone_numbers): + filtered_emails = [] + for name_in_list, email in emails: + if name_in_list == name: + filtered_emails.append(email) + + filtered_phone_numbers = [] + for name_in_list, phone_number in phone_numbers: + if name_in_list == name: + filtered_phone_numbers.append(phone_number) + + return '; '.join(['%s' % name, + '%s' % ','.join(filtered_emails), + '%s' % ','.join(filtered_phone_numbers)]) + + contact_lines = names | 'CreateContacts' >> beam.core.Map( + join_info, AsIter(emails), AsIter(phones)) + # [END model_join_using_side_inputs] + contact_lines | beam.io.WriteToText(output_path) # [START model_library_transforms_keys] diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index f7b51a75e7eb..c5102847abd1 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -129,41 +129,40 @@ def test_pardo_with_label(self): self.assertEqual({1, 2, 4}, set(result)) def test_pardo_side_input(self): - p = TestPipeline() - words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd']) - - # [START model_pardo_side_input] - # Callable takes additional arguments. - def filter_using_length(word, lower_bound, upper_bound=float('inf')): - if lower_bound <= len(word) <= upper_bound: - yield word - - # Construct a deferred side input. - avg_word_len = (words - | beam.Map(len) - | beam.CombineGlobally(beam.combiners.MeanCombineFn())) - - # Call with explicit side inputs. - small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3) - - # A single deferred side input. - larger_than_average = (words | 'large' >> beam.FlatMap( - filter_using_length, - lower_bound=pvalue.AsSingleton(avg_word_len))) - - # Mix and match. - small_but_nontrivial = words | beam.FlatMap(filter_using_length, - lower_bound=2, - upper_bound=pvalue.AsSingleton( - avg_word_len)) - # [END model_pardo_side_input] - - assert_that(small_words, equal_to(['a', 'bb', 'ccc'])) - assert_that(larger_than_average, equal_to(['ccc', 'dddd']), - label='larger_than_average') - assert_that(small_but_nontrivial, equal_to(['bb']), - label='small_but_not_trivial') - p.run() + with TestPipeline() as p: + words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd']) + + # [START model_pardo_side_input] + # Callable takes additional arguments. + def filter_using_length(word, lower_bound, upper_bound=float('inf')): + if lower_bound <= len(word) <= upper_bound: + yield word + + # Construct a deferred side input. + avg_word_len = (words + | beam.Map(len) + | beam.CombineGlobally(beam.combiners.MeanCombineFn())) + + # Call with explicit side inputs. + small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3) + + # A single deferred side input. + larger_than_average = (words | 'large' >> beam.FlatMap( + filter_using_length, + lower_bound=pvalue.AsSingleton(avg_word_len))) + + # Mix and match. + small_but_nontrivial = words | beam.FlatMap(filter_using_length, + lower_bound=2, + upper_bound=pvalue.AsSingleton( + avg_word_len)) + # [END model_pardo_side_input] + + assert_that(small_words, equal_to(['a', 'bb', 'ccc'])) + assert_that(larger_than_average, equal_to(['ccc', 'dddd']), + label='larger_than_average') + assert_that(small_but_nontrivial, equal_to(['bb']), + label='small_but_not_trivial') def test_pardo_side_input_dofn(self): words = ['a', 'bb', 'ccc', 'dddd'] @@ -307,10 +306,9 @@ def expand(self, pcoll): def test_runtime_checks_off(self): # pylint: disable=expression-not-assigned - p = TestPipeline() - # [START type_hints_runtime_off] - p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str) - p.run() + with TestPipeline() as p: + # [START type_hints_runtime_off] + p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str) # [END type_hints_runtime_off] def test_runtime_checks_on(self): @@ -323,47 +321,46 @@ def test_runtime_checks_on(self): # [END type_hints_runtime_on] def test_deterministic_key(self): - p = TestPipeline() - lines = (p | beam.Create( - ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3'])) + with TestPipeline() as p: + lines = (p | beam.Create( + ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3'])) - # For pickling - global Player # pylint: disable=global-variable-not-assigned + # For pickling + global Player # pylint: disable=global-variable-not-assigned - # [START type_hints_deterministic_key] - class Player(object): - def __init__(self, team, name): - self.team = team - self.name = name + # [START type_hints_deterministic_key] + class Player(object): + def __init__(self, team, name): + self.team = team + self.name = name - class PlayerCoder(beam.coders.Coder): - def encode(self, player): - return '%s:%s' % (player.team, player.name) + class PlayerCoder(beam.coders.Coder): + def encode(self, player): + return '%s:%s' % (player.team, player.name) - def decode(self, s): - return Player(*s.split(':')) + def decode(self, s): + return Player(*s.split(':')) - def is_deterministic(self): - return True + def is_deterministic(self): + return True - beam.coders.registry.register_coder(Player, PlayerCoder) + beam.coders.registry.register_coder(Player, PlayerCoder) - def parse_player_and_score(csv): - name, team, score = csv.split(',') - return Player(team, name), int(score) + def parse_player_and_score(csv): + name, team, score = csv.split(',') + return Player(team, name), int(score) - totals = ( - lines - | beam.Map(parse_player_and_score) - | beam.CombinePerKey(sum).with_input_types( - beam.typehints.Tuple[Player, int])) - # [END type_hints_deterministic_key] + totals = ( + lines + | beam.Map(parse_player_and_score) + | beam.CombinePerKey(sum).with_input_types( + beam.typehints.Tuple[Player, int])) + # [END type_hints_deterministic_key] - assert_that( - totals | beam.Map(lambda (k, v): (k.name, v)), - equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)])) + assert_that( + totals | beam.Map(lambda (k, v): (k.name, v)), + equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)])) - p.run() class SnippetsTest(unittest.TestCase): @@ -802,109 +799,104 @@ def test_count(self): self.assertEqual({('cat', 3), ('dog', 2)}, set(perkey_counts)) def test_setting_fixed_windows(self): - p = TestPipeline() - unkeyed_items = p | beam.Create([22, 33, 55, 100, 115, 120]) - items = (unkeyed_items - | 'key' >> beam.Map( - lambda x: beam.window.TimestampedValue(('k', x), x))) - # [START setting_fixed_windows] - from apache_beam import window - fixed_windowed_items = ( - items | 'window' >> beam.WindowInto(window.FixedWindows(60))) - # [END setting_fixed_windows] - summed = (fixed_windowed_items - | 'group' >> beam.GroupByKey() - | 'combine' >> beam.CombineValues(sum)) - unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - assert_that(unkeyed, equal_to([110, 215, 120])) - p.run() + with TestPipeline() as p: + unkeyed_items = p | beam.Create([22, 33, 55, 100, 115, 120]) + items = (unkeyed_items + | 'key' >> beam.Map( + lambda x: beam.window.TimestampedValue(('k', x), x))) + # [START setting_fixed_windows] + from apache_beam import window + fixed_windowed_items = ( + items | 'window' >> beam.WindowInto(window.FixedWindows(60))) + # [END setting_fixed_windows] + summed = (fixed_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + assert_that(unkeyed, equal_to([110, 215, 120])) def test_setting_sliding_windows(self): - p = TestPipeline() - unkeyed_items = p | beam.Create([2, 16, 23]) - items = (unkeyed_items - | 'key' >> beam.Map( - lambda x: beam.window.TimestampedValue(('k', x), x))) - # [START setting_sliding_windows] - from apache_beam import window - sliding_windowed_items = ( - items | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5))) - # [END setting_sliding_windows] - summed = (sliding_windowed_items - | 'group' >> beam.GroupByKey() - | 'combine' >> beam.CombineValues(sum)) - unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - assert_that(unkeyed, - equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41])) - p.run() + with TestPipeline() as p: + unkeyed_items = p | beam.Create([2, 16, 23]) + items = (unkeyed_items + | 'key' >> beam.Map( + lambda x: beam.window.TimestampedValue(('k', x), x))) + # [START setting_sliding_windows] + from apache_beam import window + sliding_windowed_items = ( + items | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5))) + # [END setting_sliding_windows] + summed = (sliding_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + assert_that(unkeyed, + equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41])) def test_setting_session_windows(self): - p = TestPipeline() - unkeyed_items = p | beam.Create([2, 11, 16, 27]) - items = (unkeyed_items - | 'key' >> beam.Map( - lambda x: beam.window.TimestampedValue(('k', x), x))) - # [START setting_session_windows] - from apache_beam import window - session_windowed_items = ( - items | 'window' >> beam.WindowInto(window.Sessions(10))) - # [END setting_session_windows] - summed = (session_windowed_items - | 'group' >> beam.GroupByKey() - | 'combine' >> beam.CombineValues(sum)) - unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - assert_that(unkeyed, - equal_to([29, 27])) - p.run() + with TestPipeline() as p: + unkeyed_items = p | beam.Create([2, 11, 16, 27]) + items = (unkeyed_items + | 'key' >> beam.Map( + lambda x: beam.window.TimestampedValue(('k', x), x))) + # [START setting_session_windows] + from apache_beam import window + session_windowed_items = ( + items | 'window' >> beam.WindowInto(window.Sessions(10))) + # [END setting_session_windows] + summed = (session_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + assert_that(unkeyed, + equal_to([29, 27])) def test_setting_global_window(self): - p = TestPipeline() - unkeyed_items = p | beam.Create([2, 11, 16, 27]) - items = (unkeyed_items - | 'key' >> beam.Map( - lambda x: beam.window.TimestampedValue(('k', x), x))) - # [START setting_global_window] - from apache_beam import window - session_windowed_items = ( - items | 'window' >> beam.WindowInto(window.GlobalWindows())) - # [END setting_global_window] - summed = (session_windowed_items - | 'group' >> beam.GroupByKey() - | 'combine' >> beam.CombineValues(sum)) - unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - assert_that(unkeyed, equal_to([56])) - p.run() + with TestPipeline() as p: + unkeyed_items = p | beam.Create([2, 11, 16, 27]) + items = (unkeyed_items + | 'key' >> beam.Map( + lambda x: beam.window.TimestampedValue(('k', x), x))) + # [START setting_global_window] + from apache_beam import window + session_windowed_items = ( + items | 'window' >> beam.WindowInto(window.GlobalWindows())) + # [END setting_global_window] + summed = (session_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + assert_that(unkeyed, equal_to([56])) def test_setting_timestamp(self): - p = TestPipeline() - unkeyed_items = p | beam.Create([12, 30, 60, 61, 66]) - items = (unkeyed_items | 'key' >> beam.Map(lambda x: ('k', x))) + with TestPipeline() as p: + unkeyed_items = p | beam.Create([12, 30, 60, 61, 66]) + items = (unkeyed_items | 'key' >> beam.Map(lambda x: ('k', x))) - def extract_timestamp_from_log_entry(entry): - return entry[1] + def extract_timestamp_from_log_entry(entry): + return entry[1] - # [START setting_timestamp] - class AddTimestampDoFn(beam.DoFn): + # [START setting_timestamp] + class AddTimestampDoFn(beam.DoFn): - def process(self, element): - # Extract the numeric Unix seconds-since-epoch timestamp to be - # associated with the current log entry. - unix_timestamp = extract_timestamp_from_log_entry(element) - # Wrap and emit the current entry and new timestamp in a - # TimestampedValue. - yield beam.window.TimestampedValue(element, unix_timestamp) - - timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn()) - # [END setting_timestamp] - fixed_windowed_items = ( - timestamped_items | 'window' >> beam.WindowInto( - beam.window.FixedWindows(60))) - summed = (fixed_windowed_items - | 'group' >> beam.GroupByKey() - | 'combine' >> beam.CombineValues(sum)) - unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - assert_that(unkeyed, equal_to([42, 187])) - p.run() + def process(self, element): + # Extract the numeric Unix seconds-since-epoch timestamp to be + # associated with the current log entry. + unix_timestamp = extract_timestamp_from_log_entry(element) + # Wrap and emit the current entry and new timestamp in a + # TimestampedValue. + yield beam.window.TimestampedValue(element, unix_timestamp) + + timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn()) + # [END setting_timestamp] + fixed_windowed_items = ( + timestamped_items | 'window' >> beam.WindowInto( + beam.window.FixedWindows(60))) + summed = (fixed_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + assert_that(unkeyed, equal_to([42, 187])) class PTransformTest(unittest.TestCase): @@ -919,10 +911,9 @@ def expand(self, pcoll): return pcoll | beam.Map(lambda x: len(x)) # [END model_composite_transform] - p = TestPipeline() - lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths() - assert_that(lengths, equal_to([1, 2, 3])) - p.run() + with TestPipeline() as p: + lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths() + assert_that(lengths, equal_to([1, 2, 3])) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py index d0cc8a201b41..cf414d1dc24b 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcap.py +++ b/sdks/python/apache_beam/examples/streaming_wordcap.py @@ -41,22 +41,21 @@ def run(argv=None): help='Output PubSub topic of the form "/topics//".') known_args, pipeline_args = parser.parse_known_args(argv) - p = beam.Pipeline(argv=pipeline_args) + with beam.Pipeline(argv=pipeline_args) as p: - # Read the text file[pattern] into a PCollection. - lines = p | beam.io.Read( - beam.io.PubSubSource(known_args.input_topic)) + # Read the text file[pattern] into a PCollection. + lines = p | beam.io.Read( + beam.io.PubSubSource(known_args.input_topic)) - # Capitalize the characters in each line. - transformed = (lines - | 'capitalize' >> (beam.Map(lambda x: x.upper()))) + # Capitalize the characters in each line. + transformed = (lines + | 'capitalize' >> (beam.Map(lambda x: x.upper()))) - # Write to PubSub. - # pylint: disable=expression-not-assigned - transformed | beam.io.Write( - beam.io.PubSubSink(known_args.output_topic)) + # Write to PubSub. + # pylint: disable=expression-not-assigned + transformed | beam.io.Write( + beam.io.PubSubSink(known_args.output_topic)) - p.run().wait_until_finish() if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index 4b6aecc98c4a..528f17b341e7 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -44,29 +44,28 @@ def run(argv=None): help='Output PubSub topic of the form "/topics//".') known_args, pipeline_args = parser.parse_known_args(argv) - p = beam.Pipeline(argv=pipeline_args) - - # Read the text file[pattern] into a PCollection. - lines = p | 'read' >> beam.io.Read( - beam.io.PubSubSource(known_args.input_topic)) - - # Capitalize the characters in each line. - transformed = (lines - | 'Split' >> ( - beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - .with_output_types(unicode)) - | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) - | beam.WindowInto(window.FixedWindows(15, 0)) - | 'Group' >> beam.GroupByKey() - | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones))) - | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup)) - - # Write to PubSub. - # pylint: disable=expression-not-assigned - transformed | 'pubsub_write' >> beam.io.Write( - beam.io.PubSubSink(known_args.output_topic)) - - p.run().wait_until_finish() + with beam.Pipeline(argv=pipeline_args) as p: + + # Read the text file[pattern] into a PCollection. + lines = p | 'read' >> beam.io.Read( + beam.io.PubSubSource(known_args.input_topic)) + + # Capitalize the characters in each line. + transformed = (lines + | 'Split' >> ( + beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + .with_output_types(unicode)) + | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) + | beam.WindowInto(window.FixedWindows(15, 0)) + | 'Group' >> beam.GroupByKey() + | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones))) + | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup)) + + # Write to PubSub. + # pylint: disable=expression-not-assigned + transformed | 'pubsub_write' >> beam.io.Write( + beam.io.PubSubSink(known_args.output_topic)) + if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index e7e542a74219..34dedb2b819a 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -102,7 +102,6 @@ def run(argv=None): # pylint: disable=expression-not-assigned output | 'write' >> WriteToText(known_args.output) - # Actually run the pipeline (all operations above are deferred). result = p.run() result.wait_until_finish() diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index ca9f7b6371de..c092fedb9b9f 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -118,35 +118,33 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - # Read the text file[pattern] into a PCollection, count the occurrences of - # each word and filter by a list of words. - filtered_words = ( - p | 'read' >> ReadFromText(known_args.input) - | CountWords() - | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) - - # assert_that is a convenient PTransform that checks a PCollection has an - # expected value. Asserts are best used in unit tests with small data sets but - # is demonstrated here as a teaching tool. - # - # Note assert_that does not provide any output and that successful completion - # of the Pipeline implies that the expectations were met. Learn more at - # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to - # test your pipeline. - assert_that( - filtered_words, equal_to([('Flourish', 3), ('stomach', 1)])) - - # Format the counts into a PCollection of strings and write the output using a - # "Write" transform that has side effects. - # pylint: disable=unused-variable - output = (filtered_words - | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) - | 'write' >> WriteToText(known_args.output)) - - # Actually run the pipeline (all operations above are deferred). - p.run().wait_until_finish() + with beam.Pipeline(options=pipeline_options) as p: + + # Read the text file[pattern] into a PCollection, count the occurrences of + # each word and filter by a list of words. + filtered_words = ( + p | 'read' >> ReadFromText(known_args.input) + | CountWords() + | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) + + # assert_that is a convenient PTransform that checks a PCollection has an + # expected value. Asserts are best used in unit tests with small data sets but + # is demonstrated here as a teaching tool. + # + # Note assert_that does not provide any output and that successful completion + # of the Pipeline implies that the expectations were met. Learn more at + # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to + # test your pipeline. + assert_that( + filtered_words, equal_to([('Flourish', 3), ('stomach', 1)])) + + # Format the counts into a PCollection of strings and write the output using a + # "Write" transform that has side effects. + # pylint: disable=unused-variable + output = (filtered_words + | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) + | 'write' >> WriteToText(known_args.output)) + if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py index 5109c089f993..a73bfcdb5339 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal.py @@ -92,28 +92,26 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) + with beam.Pipeline(options=pipeline_options) as p: - # Read the text file[pattern] into a PCollection. - lines = p | 'read' >> ReadFromText(known_args.input) + # Read the text file[pattern] into a PCollection. + lines = p | 'read' >> ReadFromText(known_args.input) - # Count the occurrences of each word. - counts = (lines - | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - .with_output_types(unicode)) - | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) - | 'group' >> beam.GroupByKey() - | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))) + # Count the occurrences of each word. + counts = (lines + | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + .with_output_types(unicode)) + | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) + | 'group' >> beam.GroupByKey() + | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))) - # Format the counts into a PCollection of strings. - output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) + # Format the counts into a PCollection of strings. + output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) - # Write the output using a "Write" transform that has side effects. - # pylint: disable=expression-not-assigned - output | 'write' >> WriteToText(known_args.output) + # Write the output using a "Write" transform that has side effects. + # pylint: disable=expression-not-assigned + output | 'write' >> WriteToText(known_args.output) - # Actually run the pipeline (all operations above are deferred). - p.run().wait_until_finish() if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py index 1f6aeee56677..74005f4900b9 100644 --- a/sdks/python/apache_beam/io/filebasedsink_test.py +++ b/sdks/python/apache_beam/io/filebasedsink_test.py @@ -146,9 +146,8 @@ def test_empty_write(self): sink = MyFileBasedSink( temp_path, file_name_suffix='.output', coder=coders.ToStringCoder() ) - p = TestPipeline() - p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned - p.run() + with TestPipeline() as p: + p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned self.assertEqual( open(temp_path + '-00000-of-00001.output').read(), '[start][end]') @@ -160,9 +159,8 @@ def test_static_value_provider_empty_write(self): file_name_suffix=StaticValueProvider(value_type=str, value='.output'), coder=coders.ToStringCoder() ) - p = TestPipeline() - p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned - p.run() + with TestPipeline() as p: + p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned self.assertEqual( open(temp_path.get() + '-00000-of-00001.output').read(), '[start][end]') @@ -174,10 +172,9 @@ def test_fixed_shard_write(self): num_shards=3, shard_name_template='_NN_SSS_', coder=coders.ToStringCoder()) - p = TestPipeline() - p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned + with TestPipeline() as p: + p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned - p.run() concat = ''.join( open(temp_path + '_03_%03d_.output' % shard_num).read() diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 5048534b3852..9093abfccfc3 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -28,19 +28,18 @@ Typical usage: # Create a pipeline object using a local runner for execution. - p = beam.Pipeline('DirectRunner') + with beam.Pipeline('DirectRunner') as p: - # Add to the pipeline a "Create" transform. When executed this - # transform will produce a PCollection object with the specified values. - pcoll = p | 'Create' >> beam.Create([1, 2, 3]) + # Add to the pipeline a "Create" transform. When executed this + # transform will produce a PCollection object with the specified values. + pcoll = p | 'Create' >> beam.Create([1, 2, 3]) - # Another transform could be applied to pcoll, e.g., writing to a text file. - # For other transforms, refer to transforms/ directory. - pcoll | 'Write' >> beam.io.WriteToText('./output') + # Another transform could be applied to pcoll, e.g., writing to a text file. + # For other transforms, refer to transforms/ directory. + pcoll | 'Write' >> beam.io.WriteToText('./output') - # run() will execute the DAG stored in the pipeline. The execution of the - # nodes visited is done using the specified local runner. - p.run() + # run() will execute the DAG stored in the pipeline. The execution of the + # nodes visited is done using the specified local runner. """ diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 946a60a3675c..c6db26b8123a 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -247,26 +247,24 @@ def match(actual): pipeline.run() def test_tuple_combine_fn(self): - p = TestPipeline() - result = ( - p - | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)]) - | beam.CombineGlobally(combine.TupleCombineFn(max, - combine.MeanCombineFn(), - sum)).without_defaults()) - assert_that(result, equal_to([('c', 111.0 / 3, 99.0)])) - p.run() + with TestPipeline() as p: + result = ( + p + | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)]) + | beam.CombineGlobally(combine.TupleCombineFn(max, + combine.MeanCombineFn(), + sum)).without_defaults()) + assert_that(result, equal_to([('c', 111.0 / 3, 99.0)])) def test_tuple_combine_fn_without_defaults(self): - p = TestPipeline() - result = ( - p - | Create([1, 1, 2, 3]) - | beam.CombineGlobally( - combine.TupleCombineFn(min, combine.MeanCombineFn(), max) - .with_common_input()).without_defaults()) - assert_that(result, equal_to([(1, 7.0 / 4, 3)])) - p.run() + with TestPipeline() as p: + result = ( + p + | Create([1, 1, 2, 3]) + | beam.CombineGlobally( + combine.TupleCombineFn(min, combine.MeanCombineFn(), max) + .with_common_input()).without_defaults()) + assert_that(result, equal_to([(1, 7.0 / 4, 3)])) def test_to_list_and_to_dict(self): pipeline = TestPipeline() @@ -295,15 +293,13 @@ def match(actual): pipeline.run() def test_combine_globally_with_default(self): - p = TestPipeline() - assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0])) - p.run() + with TestPipeline() as p: + assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0])) def test_combine_globally_without_default(self): - p = TestPipeline() - result = p | Create([]) | CombineGlobally(sum).without_defaults() - assert_that(result, equal_to([])) - p.run() + with TestPipeline() as p: + result = p | Create([]) | CombineGlobally(sum).without_defaults() + assert_that(result, equal_to([])) def test_combine_globally_with_default_side_input(self): class CombineWithSideInput(PTransform): @@ -312,12 +308,11 @@ def expand(self, pcoll): main = pcoll.pipeline | Create([None]) return main | Map(lambda _, s: s, side) - p = TestPipeline() - result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput() - result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput() - assert_that(result1, equal_to([0]), label='r1') - assert_that(result2, equal_to([10]), label='r2') - p.run() + with TestPipeline() as p: + result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput() + result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput() + assert_that(result1, equal_to([0]), label='r1') + assert_that(result2, equal_to([10]), label='r2') if __name__ == '__main__': diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index fd1bb9d5250b..977a364ad883 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -167,90 +167,85 @@ def timestamped_key_values(self, pipeline, key, *timestamps): | Map(lambda x: WindowedValue((key, x), x, [GlobalWindow()]))) def test_sliding_windows(self): - p = TestPipeline() - pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3) - result = (pcoll - | 'w' >> WindowInto(SlidingWindows(period=2, size=4)) - | GroupByKey() - | reify_windows) - expected = [('key @ [-2.0, 2.0)', [1]), - ('key @ [0.0, 4.0)', [1, 2, 3]), - ('key @ [2.0, 6.0)', [2, 3])] - assert_that(result, equal_to(expected)) - p.run() + with TestPipeline() as p: + pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3) + result = (pcoll + | 'w' >> WindowInto(SlidingWindows(period=2, size=4)) + | GroupByKey() + | reify_windows) + expected = [('key @ [-2.0, 2.0)', [1]), + ('key @ [0.0, 4.0)', [1, 2, 3]), + ('key @ [2.0, 6.0)', [2, 3])] + assert_that(result, equal_to(expected)) def test_sessions(self): - p = TestPipeline() - pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27) - result = (pcoll - | 'w' >> WindowInto(Sessions(10)) - | GroupByKey() - | sort_values - | reify_windows) - expected = [('key @ [1.0, 13.0)', [1, 2, 3]), - ('key @ [20.0, 45.0)', [20, 27, 35])] - assert_that(result, equal_to(expected)) - p.run() + with TestPipeline() as p: + pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27) + result = (pcoll + | 'w' >> WindowInto(Sessions(10)) + | GroupByKey() + | sort_values + | reify_windows) + expected = [('key @ [1.0, 13.0)', [1, 2, 3]), + ('key @ [20.0, 45.0)', [20, 27, 35])] + assert_that(result, equal_to(expected)) def test_timestamped_value(self): - p = TestPipeline() - result = (p - | 'start' >> Create([(k, k) for k in range(10)]) - | Map(lambda (x, t): TimestampedValue(x, t)) - | 'w' >> WindowInto(FixedWindows(5)) - | Map(lambda v: ('key', v)) - | GroupByKey()) - assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]), - ('key', [5, 6, 7, 8, 9])])) - p.run() + with TestPipeline() as p: + result = (p + | 'start' >> Create([(k, k) for k in range(10)]) + | Map(lambda (x, t): TimestampedValue(x, t)) + | 'w' >> WindowInto(FixedWindows(5)) + | Map(lambda v: ('key', v)) + | GroupByKey()) + assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]), + ('key', [5, 6, 7, 8, 9])])) def test_rewindow(self): - p = TestPipeline() - result = (p - | Create([(k, k) for k in range(10)]) - | Map(lambda (x, t): TimestampedValue(x, t)) - | 'window' >> WindowInto(SlidingWindows(period=2, size=6)) - # Per the model, each element is now duplicated across - # three windows. Rewindowing must preserve this duplication. - | 'rewindow' >> WindowInto(FixedWindows(5)) - | 'rewindow2' >> WindowInto(FixedWindows(5)) - | Map(lambda v: ('key', v)) - | GroupByKey()) - assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)), - ('key', sorted([5, 6, 7, 8, 9] * 3))])) - p.run() + with TestPipeline() as p: + result = (p + | Create([(k, k) for k in range(10)]) + | Map(lambda (x, t): TimestampedValue(x, t)) + | 'window' >> WindowInto(SlidingWindows(period=2, size=6)) + # Per the model, each element is now duplicated across + # three windows. Rewindowing must preserve this duplication. + | 'rewindow' >> WindowInto(FixedWindows(5)) + | 'rewindow2' >> WindowInto(FixedWindows(5)) + | Map(lambda v: ('key', v)) + | GroupByKey()) + assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)), + ('key', sorted([5, 6, 7, 8, 9] * 3))])) def test_timestamped_with_combiners(self): - p = TestPipeline() - result = (p - # Create some initial test values. - | 'start' >> Create([(k, k) for k in range(10)]) - # The purpose of the WindowInto transform is to establish a - # FixedWindows windowing function for the PCollection. - # It does not bucket elements into windows since the timestamps - # from Create are not spaced 5 ms apart and very likely they all - # fall into the same window. - | 'w' >> WindowInto(FixedWindows(5)) - # Generate timestamped values using the values as timestamps. - # Now there are values 5 ms apart and since Map propagates the - # windowing function from input to output the output PCollection - # will have elements falling into different 5ms windows. - | Map(lambda (x, t): TimestampedValue(x, t)) - # We add a 'key' to each value representing the index of the - # window. This is important since there is no guarantee of - # order for the elements of a PCollection. - | Map(lambda v: (v / 5, v))) - # Sum all elements associated with a key and window. Although it - # is called CombinePerKey it is really CombinePerKeyAndWindow the - # same way GroupByKey is really GroupByKeyAndWindow. - sum_per_window = result | CombinePerKey(sum) - # Compute mean per key and window. - mean_per_window = result | combiners.Mean.PerKey() - assert_that(sum_per_window, equal_to([(0, 10), (1, 35)]), - label='assert:sum') - assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]), - label='assert:mean') - p.run() + with TestPipeline() as p: + result = (p + # Create some initial test values. + | 'start' >> Create([(k, k) for k in range(10)]) + # The purpose of the WindowInto transform is to establish a + # FixedWindows windowing function for the PCollection. + # It does not bucket elements into windows since the timestamps + # from Create are not spaced 5 ms apart and very likely they all + # fall into the same window. + | 'w' >> WindowInto(FixedWindows(5)) + # Generate timestamped values using the values as timestamps. + # Now there are values 5 ms apart and since Map propagates the + # windowing function from input to output the output PCollection + # will have elements falling into different 5ms windows. + | Map(lambda (x, t): TimestampedValue(x, t)) + # We add a 'key' to each value representing the index of the + # window. This is important since there is no guarantee of + # order for the elements of a PCollection. + | Map(lambda v: (v / 5, v))) + # Sum all elements associated with a key and window. Although it + # is called CombinePerKey it is really CombinePerKeyAndWindow the + # same way GroupByKey is really GroupByKeyAndWindow. + sum_per_window = result | CombinePerKey(sum) + # Compute mean per key and window. + mean_per_window = result | combiners.Mean.PerKey() + assert_that(sum_per_window, equal_to([(0, 10), (1, 35)]), + label='assert:sum') + assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]), + label='assert:mean') class RunnerApiTest(unittest.TestCase): diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py index e31b9cc9830d..50f0debb0a70 100644 --- a/sdks/python/apache_beam/transforms/write_ptransform_test.py +++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py @@ -98,11 +98,10 @@ def _run_write_test(self, return_write_results=True): write_to_test_sink = WriteToTestSink(return_init_result, return_write_results) - p = TestPipeline() - result = p | beam.Create(data) | write_to_test_sink | beam.Map(list) + with TestPipeline() as p: + result = p | beam.Create(data) | write_to_test_sink | beam.Map(list) - assert_that(result, is_empty()) - p.run() + assert_that(result, is_empty()) sink = write_to_test_sink.last_sink self.assertIsNotNone(sink) diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 589dc0e5ac82..c81ef320e027 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -168,12 +168,11 @@ def test_deferred_side_inputs(self): @typehints.with_input_types(str, int) def repeat(s, times): return s * times - p = TestPipeline() - main_input = p | beam.Create(['a', 'bb', 'c']) - side_input = p | 'side' >> beam.Create([3]) - result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input)) - assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc'])) - p.run() + with TestPipeline() as p: + main_input = p | beam.Create(['a', 'bb', 'c']) + side_input = p | 'side' >> beam.Create([3]) + result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input)) + assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc'])) bad_side_input = p | 'bad_side' >> beam.Create(['z']) with self.assertRaises(typehints.TypeCheckError): @@ -183,12 +182,11 @@ def test_deferred_side_input_iterable(self): @typehints.with_input_types(str, typehints.Iterable[str]) def concat(glue, items): return glue.join(sorted(items)) - p = TestPipeline() - main_input = p | beam.Create(['a', 'bb', 'c']) - side_input = p | 'side' >> beam.Create(['x', 'y', 'z']) - result = main_input | beam.Map(concat, pvalue.AsIter(side_input)) - assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz'])) - p.run() + with TestPipeline() as p: + main_input = p | beam.Create(['a', 'bb', 'c']) + side_input = p | 'side' >> beam.Create(['x', 'y', 'z']) + result = main_input | beam.Map(concat, pvalue.AsIter(side_input)) + assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz'])) bad_side_input = p | 'bad_side' >> beam.Create([1, 2, 3]) with self.assertRaises(typehints.TypeCheckError): From 341f06a71b281cfde84c8984ba7096ec7dac7a6d Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 18 May 2017 17:30:50 -0700 Subject: [PATCH 2/4] Some more fixes. --- .../complete/game/hourly_team_score.py | 19 ++- .../examples/complete/game/user_score.py | 15 +-- .../complete/juliaset/juliaset/juliaset.py | 44 ++++--- .../apache_beam/examples/cookbook/coders.py | 16 ++- .../examples/cookbook/group_with_coder.py | 43 ++++--- .../examples/cookbook/mergecontacts.py | 114 +++++++++--------- .../cookbook/multiple_output_pardo.py | 72 ++++++----- .../apache_beam/examples/snippets/snippets.py | 44 +++---- 8 files changed, 173 insertions(+), 194 deletions(-) diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py index e9d71881f8c9..9f398d9995f2 100644 --- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py +++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py @@ -276,18 +276,15 @@ def run(argv=None): known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) - p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True - - (p # pylint: disable=expression-not-assigned - | ReadFromText(known_args.input) - | HourlyTeamScore( - known_args.start_min, known_args.stop_min, known_args.window_duration) - | WriteWindowedToBigQuery( - known_args.table_name, known_args.dataset, configure_bigquery_write())) - - result = p.run() - result.wait_until_finish() + with beam.Pipeline(options=pipeline_options) as p: + + (p # pylint: disable=expression-not-assigned + | ReadFromText(known_args.input) + | HourlyTeamScore( + known_args.start_min, known_args.stop_min, known_args.window_duration) + | WriteWindowedToBigQuery( + known_args.table_name, known_args.dataset, configure_bigquery_write())) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py index 389d2c6a8433..c9f273814a85 100644 --- a/sdks/python/apache_beam/examples/complete/game/user_score.py +++ b/sdks/python/apache_beam/examples/complete/game/user_score.py @@ -201,16 +201,13 @@ def run(argv=None): known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) - p = beam.Pipeline(options=pipeline_options) + with beam.Pipeline(options=pipeline_options) as p: - (p # pylint: disable=expression-not-assigned - | ReadFromText(known_args.input) # Read events from a file and parse them. - | UserScore() - | WriteToBigQuery( - known_args.table_name, known_args.dataset, configure_bigquery_write())) - - result = p.run() - result.wait_until_finish() + (p # pylint: disable=expression-not-assigned + | ReadFromText(known_args.input) # Read events from a file and parse them. + | UserScore() + | WriteToBigQuery( + known_args.table_name, known_args.dataset, configure_bigquery_write())) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py index 5ff2b785108b..6e8951a23744 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py @@ -99,26 +99,24 @@ def run(argv=None): # pylint: disable=missing-docstring help='Output file to write the resulting image to.') known_args, pipeline_args = parser.parse_known_args(argv) - p = beam.Pipeline(argv=pipeline_args) - n = int(known_args.grid_size) - - coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100) - - # Group each coordinate triplet by its x value, then write the coordinates to - # the output file with an x-coordinate grouping per line. - # pylint: disable=expression-not-assigned - (coordinates - | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i))) - | 'x coord' >> beam.GroupByKey() - | 'format' >> beam.Map( - lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords)) - | WriteToText(known_args.coordinate_output)) - # pylint: enable=expression-not-assigned - return p.run().wait_until_finish() - - # Optionally render the image and save it to a file. - # TODO(silviuc): Add this functionality. - # if p.options.image_output is not None: - # julia_set_image = generate_julia_set_visualization( - # file_with_coordinates, n, 100) - # save_julia_set_visualization(p.options.image_output, julia_set_image) + with beam.Pipeline(argv=pipeline_args) as p: + n = int(known_args.grid_size) + + coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100) + + # Group each coordinate triplet by its x value, then write the coordinates to + # the output file with an x-coordinate grouping per line. + # pylint: disable=expression-not-assigned + (coordinates + | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i))) + | 'x coord' >> beam.GroupByKey() + | 'format' >> beam.Map( + lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords)) + | WriteToText(known_args.coordinate_output)) + + # Optionally render the image and save it to a file. + # TODO(silviuc): Add this functionality. + # if p.options.image_output is not None: + # julia_set_image = generate_julia_set_visualization( + # file_with_coordinates, n, 100) + # save_julia_set_visualization(p.options.image_output, julia_set_image) diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py index aeeb3c9128a5..f97b0f2d9c9a 100644 --- a/sdks/python/apache_beam/examples/cookbook/coders.py +++ b/sdks/python/apache_beam/examples/cookbook/coders.py @@ -85,15 +85,13 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - p = beam.Pipeline(argv=pipeline_args) - (p # pylint: disable=expression-not-assigned - | 'read' >> ReadFromText(known_args.input, coder=JsonCoder()) - | 'points' >> beam.FlatMap(compute_points) - | beam.CombinePerKey(sum) - | 'write' >> WriteToText(known_args.output, coder=JsonCoder())) - p.run() + + with beam.Pipeline(options=pipeline_options) as p: + (p # pylint: disable=expression-not-assigned + | 'read' >> ReadFromText(known_args.input, coder=JsonCoder()) + | 'points' >> beam.FlatMap(compute_points) + | beam.CombinePerKey(sum) + | 'write' >> WriteToText(known_args.output, coder=JsonCoder())) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py index 6bdadae9f3d1..a9e38707616a 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py @@ -95,28 +95,27 @@ def run(args=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - # Register the custom coder for the Player class, so that it will be used in - # the computation. - coders.registry.register_coder(Player, PlayerCoder) - - (p # pylint: disable=expression-not-assigned - | ReadFromText(known_args.input) - # The get_players function is annotated with a type hint above, so the type - # system knows the output type of the following operation is a key-value pair - # of a Player and an int. Please see the documentation for details on - # types that are inferred automatically as well as other ways to specify - # type hints. - | beam.Map(get_players) - # The output type hint of the previous step is used to infer that the key - # type of the following operation is the Player type. Since a custom coder - # is registered for the Player class above, a PlayerCoder will be used to - # encode Player objects as keys for this combine operation. - | beam.CombinePerKey(sum) - | beam.Map(lambda (k, v): '%s,%d' % (k.name, v)) - | WriteToText(known_args.output)) - return p.run() + with beam.Pipeline(options=pipeline_options) as p: + + # Register the custom coder for the Player class, so that it will be used in + # the computation. + coders.registry.register_coder(Player, PlayerCoder) + + (p # pylint: disable=expression-not-assigned + | ReadFromText(known_args.input) + # The get_players function is annotated with a type hint above, so the type + # system knows the output type of the following operation is a key-value pair + # of a Player and an int. Please see the documentation for details on + # types that are inferred automatically as well as other ways to specify + # type hints. + | beam.Map(get_players) + # The output type hint of the previous step is used to infer that the key + # type of the following operation is the Player type. Since a custom coder + # is registered for the Player class above, a PlayerCoder will be used to + # encode Player objects as keys for this combine operation. + | beam.CombinePerKey(sum) + | beam.Map(lambda (k, v): '%s,%d' % (k.name, v)) + | WriteToText(known_args.output)) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index 4f53c615ba97..ddd27cc14ede 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -70,64 +70,62 @@ def run(argv=None, assert_results=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - # Helper: read a tab-separated key-value mapping from a text file, escape all - # quotes/backslashes, and convert it a PCollection of (key, value) pairs. - def read_kv_textfile(label, textfile): - return (p - | 'Read: %s' % label >> ReadFromText(textfile) - | 'Backslash: %s' % label >> beam.Map( - lambda x: re.sub(r'\\', r'\\\\', x)) - | 'EscapeQuotes: %s' % label >> beam.Map( - lambda x: re.sub(r'"', r'\"', x)) - | 'Split: %s' % label >> beam.Map( - lambda x: re.split(r'\t+', x, 1))) - - # Read input databases. - email = read_kv_textfile('email', known_args.input_email) - phone = read_kv_textfile('phone', known_args.input_phone) - snailmail = read_kv_textfile('snailmail', known_args.input_snailmail) - - # Group together all entries under the same name. - grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey() - - # Prepare tab-delimited output; something like this: - # "name""email_1,email_2""phone""first_snailmail_only" - tsv_lines = grouped | beam.Map( - lambda (name, (email, phone, snailmail)): '\t'.join( - ['"%s"' % name, - '"%s"' % ','.join(email), - '"%s"' % ','.join(phone), - '"%s"' % next(iter(snailmail), '')])) - - # Compute some stats about our database of people. - luddites = grouped | beam.Filter( # People without email. - lambda (name, (email, phone, snailmail)): not next(iter(email), None)) - writers = grouped | beam.Filter( # People without phones. - lambda (name, (email, phone, snailmail)): not next(iter(phone), None)) - nomads = grouped | beam.Filter( # People without addresses. - lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None)) - - num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally() - num_writers = writers | 'Writers' >> beam.combiners.Count.Globally() - num_nomads = nomads | 'Nomads' >> beam.combiners.Count.Globally() - - # Write tab-delimited output. - # pylint: disable=expression-not-assigned - tsv_lines | 'WriteTsv' >> WriteToText(known_args.output_tsv) - - # TODO(silviuc): Move the assert_results logic to the unit test. - if assert_results is not None: - expected_luddites, expected_writers, expected_nomads = assert_results - assert_that(num_luddites, equal_to([expected_luddites]), - label='assert:luddites') - assert_that(num_writers, equal_to([expected_writers]), - label='assert:writers') - assert_that(num_nomads, equal_to([expected_nomads]), - label='assert:nomads') - # Execute pipeline. - return p.run() + with beam.Pipeline(options=pipeline_options) as p: + + # Helper: read a tab-separated key-value mapping from a text file, escape all + # quotes/backslashes, and convert it a PCollection of (key, value) pairs. + def read_kv_textfile(label, textfile): + return (p + | 'Read: %s' % label >> ReadFromText(textfile) + | 'Backslash: %s' % label >> beam.Map( + lambda x: re.sub(r'\\', r'\\\\', x)) + | 'EscapeQuotes: %s' % label >> beam.Map( + lambda x: re.sub(r'"', r'\"', x)) + | 'Split: %s' % label >> beam.Map( + lambda x: re.split(r'\t+', x, 1))) + + # Read input databases. + email = read_kv_textfile('email', known_args.input_email) + phone = read_kv_textfile('phone', known_args.input_phone) + snailmail = read_kv_textfile('snailmail', known_args.input_snailmail) + + # Group together all entries under the same name. + grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey() + + # Prepare tab-delimited output; something like this: + # "name""email_1,email_2""phone""first_snailmail_only" + tsv_lines = grouped | beam.Map( + lambda (name, (email, phone, snailmail)): '\t'.join( + ['"%s"' % name, + '"%s"' % ','.join(email), + '"%s"' % ','.join(phone), + '"%s"' % next(iter(snailmail), '')])) + + # Compute some stats about our database of people. + luddites = grouped | beam.Filter( # People without email. + lambda (name, (email, phone, snailmail)): not next(iter(email), None)) + writers = grouped | beam.Filter( # People without phones. + lambda (name, (email, phone, snailmail)): not next(iter(phone), None)) + nomads = grouped | beam.Filter( # People without addresses. + lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None)) + + num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally() + num_writers = writers | 'Writers' >> beam.combiners.Count.Globally() + num_nomads = nomads | 'Nomads' >> beam.combiners.Count.Globally() + + # Write tab-delimited output. + # pylint: disable=expression-not-assigned + tsv_lines | 'WriteTsv' >> WriteToText(known_args.output_tsv) + + # TODO(silviuc): Move the assert_results logic to the unit test. + if assert_results is not None: + expected_luddites, expected_writers, expected_nomads = assert_results + assert_that(num_luddites, equal_to([expected_luddites]), + label='assert:luddites') + assert_that(num_writers, equal_to([expected_writers]), + label='assert:writers') + assert_that(num_nomads, equal_to([expected_nomads]), + label='assert:nomads') if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py index 9759f4821045..2316c6611c06 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py @@ -141,43 +141,41 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - lines = p | ReadFromText(known_args.input) - - # with_outputs allows accessing the explicitly tagged outputs of a DoFn. - split_lines_result = (lines - | beam.ParDo(SplitLinesToWordsFn()).with_outputs( - SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS, - SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT, - main='words')) - - # split_lines_result is an object of type DoOutputsTuple. It supports - # accessing result in alternative ways. - words, _, _ = split_lines_result - short_words = split_lines_result[ - SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS] - character_count = split_lines_result.tag_character_count - - # pylint: disable=expression-not-assigned - (character_count - | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) - | beam.GroupByKey() - | 'count chars' >> beam.Map(lambda (_, counts): sum(counts)) - | 'write chars' >> WriteToText(known_args.output + '-chars')) - - # pylint: disable=expression-not-assigned - (short_words - | 'count short words' >> CountWords() - | 'write short words' >> WriteToText( - known_args.output + '-short-words')) - - # pylint: disable=expression-not-assigned - (words - | 'count words' >> CountWords() - | 'write words' >> WriteToText(known_args.output + '-words')) - - return p.run() + with beam.Pipeline(options=pipeline_options) as p: + + lines = p | ReadFromText(known_args.input) + + # with_outputs allows accessing the explicitly tagged outputs of a DoFn. + split_lines_result = (lines + | beam.ParDo(SplitLinesToWordsFn()).with_outputs( + SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS, + SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT, + main='words')) + + # split_lines_result is an object of type DoOutputsTuple. It supports + # accessing result in alternative ways. + words, _, _ = split_lines_result + short_words = split_lines_result[ + SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS] + character_count = split_lines_result.tag_character_count + + # pylint: disable=expression-not-assigned + (character_count + | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) + | beam.GroupByKey() + | 'count chars' >> beam.Map(lambda (_, counts): sum(counts)) + | 'write chars' >> WriteToText(known_args.output + '-chars')) + + # pylint: disable=expression-not-assigned + (short_words + | 'count short words' >> CountWords() + | 'write short words' >> WriteToText( + known_args.output + '-short-words')) + + # pylint: disable=expression-not-assigned + (words + | 'count words' >> CountWords() + | 'write words' >> WriteToText(known_args.output + '-words')) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 9f5960d9bc75..0ccdf0b12326 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -147,18 +147,15 @@ def _add_argparse_args(cls, parser): pipeline_options = PipelineOptions(argv) my_options = pipeline_options.view_as(MyOptions) - p = beam.Pipeline(options=pipeline_options) + with beam.Pipeline(options=pipeline_options) as p: - (p - | beam.io.ReadFromText(my_options.input) - | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - | beam.Map(lambda x: (x, 1)) - | beam.combiners.Count.PerKey() - | beam.io.WriteToText(my_options.output)) - - result = p.run() + (p + | beam.io.ReadFromText(my_options.input) + | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + | beam.Map(lambda x: (x, 1)) + | beam.combiners.Count.PerKey() + | beam.io.WriteToText(my_options.output)) # [END model_pipelines] - result.wait_until_finish() def model_pcollection(argv): @@ -178,21 +175,18 @@ def _add_argparse_args(cls, parser): my_options = pipeline_options.view_as(MyOptions) # [START model_pcollection] - p = beam.Pipeline(options=pipeline_options) - - lines = (p - | beam.Create([ - 'To be, or not to be: that is the question: ', - 'Whether \'tis nobler in the mind to suffer ', - 'The slings and arrows of outrageous fortune, ', - 'Or to take arms against a sea of troubles, '])) - # [END model_pcollection] - - (lines - | beam.io.WriteToText(my_options.output)) - - result = p.run() - result.wait_until_finish() + with beam.Pipeline(options=pipeline_options) as p: + + lines = (p + | beam.Create([ + 'To be, or not to be: that is the question: ', + 'Whether \'tis nobler in the mind to suffer ', + 'The slings and arrows of outrageous fortune, ', + 'Or to take arms against a sea of troubles, '])) + # [END model_pcollection] + + (lines + | beam.io.WriteToText(my_options.output)) def pipeline_options_remote(argv): From e752fd45a3551c12c98184fc8482454232211a8c Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 19 May 2017 16:09:16 -0700 Subject: [PATCH 3/4] lint --- .../examples/complete/estimate_pi_test.py | 3 ++- .../complete/juliaset/juliaset/juliaset.py | 6 +++--- .../examples/complete/tfidf_test.py | 5 +++-- .../cookbook/bigquery_side_input_test.py | 6 ++++-- .../examples/cookbook/coders_test.py | 3 ++- .../examples/cookbook/group_with_coder.py | 4 ++-- .../examples/cookbook/group_with_coder_test.py | 4 ++-- .../examples/cookbook/mergecontacts.py | 7 ++++--- .../examples/cookbook/mergecontacts_test.py | 3 +-- .../cookbook/multiple_output_pardo_test.py | 2 +- .../apache_beam/examples/snippets/snippets.py | 5 +++-- .../examples/snippets/snippets_test.py | 10 +++++----- .../examples/wordcount_debugging.py | 16 ++++++++-------- .../apache_beam/examples/wordcount_minimal.py | 18 +++++++++--------- .../apache_beam/transforms/combiners_test.py | 11 +++++------ 15 files changed, 54 insertions(+), 49 deletions(-) diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py index 0e1bc2595d55..f1cbb0a24d56 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py @@ -42,7 +42,8 @@ def test_basics(self): result = p | 'Estimate' >> estimate_pi.EstimatePiTransform(5000) # Note: Probabilistically speaking this test can fail with a probability - # that is very small (VERY) given that we run at least 500 thousand trials. + # that is very small (VERY) given that we run at least 500 thousand + # trials. assert_that(result, in_between(3.125, 3.155)) diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py index 6e8951a23744..61e3fd1a8d0c 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py @@ -104,14 +104,14 @@ def run(argv=None): # pylint: disable=missing-docstring coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100) - # Group each coordinate triplet by its x value, then write the coordinates to - # the output file with an x-coordinate grouping per line. + # Group each coordinate triplet by its x value, then write the coordinates + # to the output file with an x-coordinate grouping per line. # pylint: disable=expression-not-assigned (coordinates | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i))) | 'x coord' >> beam.GroupByKey() | 'format' >> beam.Map( - lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords)) + lambda (k, coords): ' '.join('(%s, %s, %s)' % c for c in coords)) | WriteToText(known_args.coordinate_output)) # Optionally render the image and save it to a file. diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py index b1630f83d989..322426fd2b3d 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py @@ -61,8 +61,9 @@ def test_tfidf_transform(self): | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf))) assert_that(result, equal_to(EXPECTED_RESULTS)) # Run the pipeline. Note that the assert_that above adds to the pipeline - # a check that the result PCollection contains expected values. To actually - # trigger the check the pipeline must be run. + # a check that the result PCollection contains expected values. + # To actually trigger the check the pipeline must be run (e.g. by + # exiting the with context). def test_basics(self): # Setup the files with expected content. diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py index 5d6705deb0a3..964b35b3f08f 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py @@ -40,8 +40,10 @@ def test_create_groups(self): ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1']) ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1']) - groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll, - words_pcoll, ignore_corpus_pcoll, + groups = bigquery_side_input.create_groups(group_ids_pcoll, + corpus_pcoll, + words_pcoll, + ignore_corpus_pcoll, ignore_word_pcoll) assert_that(groups, equal_to( diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py index 898461e5e62f..988d3c9d25e1 100644 --- a/sdks/python/apache_beam/examples/cookbook/coders_test.py +++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py @@ -40,7 +40,8 @@ def test_compute_points(self): result = (records | 'points' >> beam.FlatMap(coders.compute_points) | beam.CombinePerKey(sum)) - assert_that(result, equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)])) + assert_that(result, + equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)])) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py index a9e38707616a..9c0d04b816ac 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py @@ -104,8 +104,8 @@ def run(args=None): (p # pylint: disable=expression-not-assigned | ReadFromText(known_args.input) # The get_players function is annotated with a type hint above, so the type - # system knows the output type of the following operation is a key-value pair - # of a Player and an int. Please see the documentation for details on + # system knows the output type of the following operation is a key-value + # pair of a Player and an int. Please see the documentation for details on # types that are inferred automatically as well as other ways to specify # type hints. | beam.Map(get_players) diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py index 4e8796647da0..268ba8d355f2 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py @@ -50,7 +50,7 @@ def test_basics_with_type_check(self): temp_path = self.create_temp_file(self.SAMPLE_RECORDS) group_with_coder.run([ '--input=%s*' % temp_path, - '--output=%s.result' % temp_path]).wait_until_finish() + '--output=%s.result' % temp_path]) # Parse result file and compare. results = [] with open(temp_path + '.result-00000-of-00001') as result_file: @@ -71,7 +71,7 @@ def test_basics_without_type_check(self): group_with_coder.run([ '--no_pipeline_type_check', '--input=%s*' % temp_path, - '--output=%s.result' % temp_path]).wait_until_finish() + '--output=%s.result' % temp_path]) # Parse result file and compare. results = [] with open(temp_path + '.result-00000-of-00001') as result_file: diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index ddd27cc14ede..21b59a2d60c2 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -72,8 +72,9 @@ def run(argv=None, assert_results=None): pipeline_options.view_as(SetupOptions).save_main_session = True with beam.Pipeline(options=pipeline_options) as p: - # Helper: read a tab-separated key-value mapping from a text file, escape all - # quotes/backslashes, and convert it a PCollection of (key, value) pairs. + # Helper: read a tab-separated key-value mapping from a text file, + # escape all quotes/backslashes, and convert it a PCollection of + # (key, value) pairs. def read_kv_textfile(label, textfile): return (p | 'Read: %s' % label >> ReadFromText(textfile) @@ -107,7 +108,7 @@ def read_kv_textfile(label, textfile): writers = grouped | beam.Filter( # People without phones. lambda (name, (email, phone, snailmail)): not next(iter(phone), None)) nomads = grouped | beam.Filter( # People without addresses. - lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None)) + lambda (name, (_, _, snailmail)): not next(iter(snailmail), None)) num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally() num_writers = writers | 'Writers' >> beam.combiners.Count.Globally() diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py index 09f71d389696..b3be0ddb0853 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py @@ -107,13 +107,12 @@ def test_mergecontacts(self): result_prefix = self.create_temp_file('') - result = mergecontacts.run([ + mergecontacts.run([ '--input_email=%s' % path_email, '--input_phone=%s' % path_phone, '--input_snailmail=%s' % path_snailmail, '--output_tsv=%s.tsv' % result_prefix, '--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3)) - result.wait_until_finish() with open('%s.tsv-00000-of-00001' % result_prefix) as f: contents = f.read() diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 2c9111c678bd..3ddd668599d0 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -52,7 +52,7 @@ def test_multiple_output_pardo(self): multiple_output_pardo.run([ '--input=%s*' % temp_path, - '--output=%s' % result_prefix]).wait_until_finish() + '--output=%s' % result_prefix]) expected_char_count = len(''.join(self.SAMPLE_TEXT.split('\n'))) with open(result_prefix + '-chars-00000-of-00001') as f: diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 0ccdf0b12326..ec8e9f9c705d 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -553,7 +553,8 @@ def process(self, element): p | beam.io.ReadFromText( 'gs://dataflow-samples/shakespeare/kinglear.txt') - | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + | 'ExtractWords' >> beam.FlatMap( + lambda x: re.findall(r'[A-Za-z\']+', x)) | beam.combiners.Count.PerElement() | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) @@ -981,9 +982,9 @@ def expand(self, pcoll): def model_multiple_pcollections_flatten(contents, output_path): """Merging a PCollection with Flatten.""" some_hash_fn = lambda s: ord(s[0]) + partition_fn = lambda element, partitions: some_hash_fn(element) % partitions import apache_beam as beam with TestPipeline() as p: # Use TestPipeline for testing. - partition_fn = lambda element, partitions: some_hash_fn(element) % partitions # Partition into deciles partitioned = p | beam.Create(contents) | beam.Partition(partition_fn, 3) diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index c5102847abd1..b2dcf1c69feb 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -119,7 +119,6 @@ def capitals(word): self.assertEqual({'A', 'C'}, set(all_capitals)) def test_pardo_with_label(self): - # pylint: disable=line-too-long words = ['aa', 'bbc', 'defg'] # [START model_pardo_with_label] result = words | 'CountUniqueLetters' >> beam.Map( @@ -129,6 +128,7 @@ def test_pardo_with_label(self): self.assertEqual({1, 2, 4}, set(result)) def test_pardo_side_input(self): + # pylint: disable=line-too-long with TestPipeline() as p: words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd']) @@ -152,10 +152,10 @@ def filter_using_length(word, lower_bound, upper_bound=float('inf')): lower_bound=pvalue.AsSingleton(avg_word_len))) # Mix and match. - small_but_nontrivial = words | beam.FlatMap(filter_using_length, - lower_bound=2, - upper_bound=pvalue.AsSingleton( - avg_word_len)) + small_but_nontrivial = words | beam.FlatMap( + filter_using_length, + lower_bound=2, + upper_bound=pvalue.AsSingleton(avg_word_len)) # [END model_pardo_side_input] assert_that(small_words, equal_to(['a', 'bb', 'ccc'])) diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index c092fedb9b9f..39d5f5c47b76 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -128,18 +128,18 @@ def run(argv=None): | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) # assert_that is a convenient PTransform that checks a PCollection has an - # expected value. Asserts are best used in unit tests with small data sets but - # is demonstrated here as a teaching tool. + # expected value. Asserts are best used in unit tests with small data sets + # but is demonstrated here as a teaching tool. # - # Note assert_that does not provide any output and that successful completion - # of the Pipeline implies that the expectations were met. Learn more at - # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to - # test your pipeline. + # Note assert_that does not provide any output and that successful + # completion of the Pipeline implies that the expectations were met. Learn + # more at https://cloud.google.com/dataflow/pipelines/testing-your-pipeline + # on how to best test your pipeline. assert_that( filtered_words, equal_to([('Flourish', 3), ('stomach', 1)])) - # Format the counts into a PCollection of strings and write the output using a - # "Write" transform that has side effects. + # Format the counts into a PCollection of strings and write the output using + # a "Write" transform that has side effects. # pylint: disable=unused-variable output = (filtered_words | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py index a73bfcdb5339..6d942622fa05 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal.py @@ -95,22 +95,22 @@ def run(argv=None): with beam.Pipeline(options=pipeline_options) as p: # Read the text file[pattern] into a PCollection. - lines = p | 'read' >> ReadFromText(known_args.input) + lines = p | ReadFromText(known_args.input) # Count the occurrences of each word. - counts = (lines - | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - .with_output_types(unicode)) - | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) - | 'group' >> beam.GroupByKey() - | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))) + counts = ( + lines + | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + .with_output_types(unicode)) + | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) + | 'GroupAndSum' >> beam.CombinePerKey(sum)) # Format the counts into a PCollection of strings. - output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) + output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c)) # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned - output | 'write' >> WriteToText(known_args.output) + output | WriteToText(known_args.output) diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index c6db26b8123a..c79fec864acb 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -251,9 +251,8 @@ def test_tuple_combine_fn(self): result = ( p | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)]) - | beam.CombineGlobally(combine.TupleCombineFn(max, - combine.MeanCombineFn(), - sum)).without_defaults()) + | beam.CombineGlobally(combine.TupleCombineFn( + max, combine.MeanCombineFn(), sum)).without_defaults()) assert_that(result, equal_to([('c', 111.0 / 3, 99.0)])) def test_tuple_combine_fn_without_defaults(self): @@ -302,15 +301,15 @@ def test_combine_globally_without_default(self): assert_that(result, equal_to([])) def test_combine_globally_with_default_side_input(self): - class CombineWithSideInput(PTransform): + class SideInputCombine(PTransform): def expand(self, pcoll): side = pcoll | CombineGlobally(sum).as_singleton_view() main = pcoll.pipeline | Create([None]) return main | Map(lambda _, s: s, side) with TestPipeline() as p: - result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput() - result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput() + result1 = p | 'i1' >> Create([]) | 'c1' >> SideInputCombine() + result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> SideInputCombine() assert_that(result1, equal_to([0]), label='r1') assert_that(result2, equal_to([10]), label='r2') From bb1a758fcc9cd97034baa7f2221ea5793ffd3ea8 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 22 May 2017 11:29:35 -0700 Subject: [PATCH 4/4] lint, test fix --- sdks/python/apache_beam/examples/complete/estimate_pi.py | 1 - .../examples/complete/top_wikipedia_sessions.py | 1 - .../apache_beam/examples/cookbook/datastore_wordcount.py | 1 - sdks/python/apache_beam/examples/cookbook/filters.py | 1 - sdks/python/apache_beam/examples/cookbook/mergecontacts.py | 2 +- sdks/python/apache_beam/examples/snippets/snippets.py | 7 ------- sdks/python/apache_beam/examples/snippets/snippets_test.py | 1 - sdks/python/apache_beam/examples/streaming_wordcap.py | 1 - sdks/python/apache_beam/examples/streaming_wordcount.py | 1 - sdks/python/apache_beam/examples/wordcount_debugging.py | 1 - sdks/python/apache_beam/examples/wordcount_minimal.py | 1 - sdks/python/apache_beam/io/filebasedsink_test.py | 1 - 12 files changed, 1 insertion(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index 8be44fff9b56..7e3c4cd35a27 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -120,7 +120,6 @@ def run(argv=None): | WriteToText(known_args.output, coder=JsonCoder())) - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run() diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index aee51ccdd39b..9a9ad7865c2c 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -167,7 +167,6 @@ def run(argv=None): | WriteToText(known_args.output)) - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run() diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index 7281662fd04b..7161cff1c255 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -146,7 +146,6 @@ def write_to_datastore(project, user_options, pipeline_options): | 'write to datastore' >> WriteToDatastore(project)) - def make_ancestor_query(kind, namespace, ancestor): """Creates a Cloud Datastore ancestor query. diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py index 742d995ae008..1fbf763e5005 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters.py +++ b/sdks/python/apache_beam/examples/cookbook/filters.py @@ -99,7 +99,6 @@ def run(argv=None): write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run() diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index 21b59a2d60c2..9acdd9073478 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -108,7 +108,7 @@ def read_kv_textfile(label, textfile): writers = grouped | beam.Filter( # People without phones. lambda (name, (email, phone, snailmail)): not next(iter(phone), None)) nomads = grouped | beam.Filter( # People without addresses. - lambda (name, (_, _, snailmail)): not next(iter(snailmail), None)) + lambda (name, (e, p, snailmail)): not next(iter(snailmail), None)) num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally() num_writers = writers | 'Writers' >> beam.combiners.Count.Globally() diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index ec8e9f9c705d..70929e9e8fae 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -297,7 +297,6 @@ def pipeline_options_command_line(argv): # [END pipeline_options_command_line] - def pipeline_logging(lines, output): """Logging Pipeline Messages.""" @@ -329,7 +328,6 @@ def process(self, element): | beam.io.WriteToText(output)) - def pipeline_monitoring(renames): """Using monitoring interface snippets.""" @@ -658,7 +656,6 @@ def model_custom_source(count): lines, equal_to( ['line ' + str(number) for number in range(0, count)])) - # We recommend users to start Source classes with an underscore to discourage # using the Source class directly when a PTransform for the source is # available. We simulate that here by simply extending the previous Source @@ -792,7 +789,6 @@ def close(self): SimpleKVSink('http://url_to_simple_kv/', final_table_name)) # [END model_custom_sink_use_new_sink] - # We recommend users to start Sink class names with an underscore to # discourage using the Sink class directly when a PTransform for the sink is # available. We simulate that here by simply extending the previous Sink @@ -823,7 +819,6 @@ def expand(self, pcoll): # [END model_custom_sink_use_ptransform] - def model_textio(renames): """Using a Read and Write transform to read/write text files.""" def filter_words(x): @@ -1008,7 +1003,6 @@ def model_multiple_pcollections_flatten(contents, output_path): merged | beam.io.WriteToText(output_path) - def model_multiple_pcollections_partition(contents, output_path): """Splitting a PCollection with Partition.""" some_hash_fn = lambda s: ord(s[0]) @@ -1036,7 +1030,6 @@ def partition_fn(student, num_partitions): | beam.io.WriteToText(output_path)) - def model_group_by_key(contents, output_path): """Applying a GroupByKey Transform.""" import re diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index b2dcf1c69feb..6654fef654cb 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -362,7 +362,6 @@ def parse_player_and_score(csv): equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)])) - class SnippetsTest(unittest.TestCase): # Replacing text read/write transforms with dummy transforms for testing. diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py index cf414d1dc24b..ce43e1f09849 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcap.py +++ b/sdks/python/apache_beam/examples/streaming_wordcap.py @@ -57,7 +57,6 @@ def run(argv=None): beam.io.PubSubSink(known_args.output_topic)) - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run() diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index 528f17b341e7..e9d5dbefa835 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -67,7 +67,6 @@ def run(argv=None): beam.io.PubSubSink(known_args.output_topic)) - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run() diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index 39d5f5c47b76..c0ffd356364c 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -146,7 +146,6 @@ def run(argv=None): | 'write' >> WriteToText(known_args.output)) - if __name__ == '__main__': # Cloud Logging would contain only logging.INFO and higher level logs logged # by the root logger. All log statements emitted by the root logger will be diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py index 6d942622fa05..76b0a221df7a 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal.py @@ -113,7 +113,6 @@ def run(argv=None): output | WriteToText(known_args.output) - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run() diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py index 74005f4900b9..7c8ddb4072ef 100644 --- a/sdks/python/apache_beam/io/filebasedsink_test.py +++ b/sdks/python/apache_beam/io/filebasedsink_test.py @@ -175,7 +175,6 @@ def test_fixed_shard_write(self): with TestPipeline() as p: p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned - concat = ''.join( open(temp_path + '_03_%03d_.output' % shard_num).read() for shard_num in range(3))