From 03e18f0d5a51963c42a9b2fc629984c163181044 Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Wed, 18 Jan 2017 15:52:43 -0800 Subject: [PATCH 1/3] Update tests and examples to use new labels. --- .../apache_beam/examples/complete/tfidf.py | 12 ++++++------ .../examples/complete/top_wikipedia_sessions.py | 16 +++++++--------- .../cookbook/bigquery_side_input_test.py | 6 ++---- .../apache_beam/examples/cookbook/filters.py | 4 ++-- .../examples/cookbook/mergecontacts.py | 11 ++++++----- .../apache_beam/examples/snippets/snippets.py | 5 ++--- .../apache_beam/examples/streaming_wordcount.py | 6 +++--- sdks/python/apache_beam/io/bigquery.py | 5 ++--- sdks/python/apache_beam/io/fileio_test.py | 6 +++--- sdks/python/apache_beam/io/textio_test.py | 6 +++--- sdks/python/apache_beam/runners/runner_test.py | 4 ++-- .../apache_beam/transforms/ptransform_test.py | 16 ++++++++-------- 12 files changed, 46 insertions(+), 51 deletions(-) diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index 4d6e0d3ef985..c048cdd6f179 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -43,7 +43,7 @@ def read_documents(pipeline, uris): pcolls.append( pipeline | 'read: %s' % uri >> ReadFromText(uri) - | beam.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri)) + | 'withkey: %s' % uri >> beam.Map(lambda v, uri: (uri, v), uri)) return pcolls | 'flatten read pcolls' >> beam.Flatten() @@ -101,8 +101,8 @@ def split_into_words((uri, line)): # for a join by the URI key. uri_to_word_and_count = ( uri_and_word_to_count - | beam.Map('shift keys', - lambda ((uri, word), count): (uri, (word, count)))) + | 'shift keys' >> beam.Map( + lambda ((uri, word), count): (uri, (word, count)))) # Perform a CoGroupByKey (a sort of pre-join) on the prepared # uri_to_word_total and uri_to_word_and_count tagged by 'word totals' and @@ -149,9 +149,9 @@ def compute_term_frequency((uri, count_and_total)): # DoFns in this way. word_to_df = ( word_to_doc_count - | beam.Map('compute doc frequencies', - lambda (word, count), total: (word, float(count) / total), - AsSingleton(total_documents))) + | 'compute doc frequencies' >> beam.Map( + lambda (word, count), total: (word, float(count) / total), + AsSingleton(total_documents))) # Join the term frequency and document frequency collections, # each keyed on the word. diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index 4920813c3b5f..d19f66d2573d 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -80,8 +80,8 @@ def __init__(self): def expand(self, pcoll): return (pcoll - | beam.WindowInto('ComputeSessionsWindow', - window.Sessions(gap_size=ONE_HOUR_IN_SECONDS)) + | 'ComputeSessionsWindow' >> beam.WindowInto( + window.Sessions(gap_size=ONE_HOUR_IN_SECONDS)) | combiners.Count.PerElement()) @@ -93,11 +93,9 @@ def __init__(self): def expand(self, pcoll): return (pcoll - | beam.WindowInto('TopPerMonthWindow', - window.FixedWindows( - size=THIRTY_DAYS_IN_SECONDS)) - | combiners.core.CombineGlobally( - 'Top', + | 'TopPerMonthWindow' >> beam.WindowInto( + window.FixedWindows(size=THIRTY_DAYS_IN_SECONDS)) + | 'Top' >> combiners.core.CombineGlobally( combiners.TopCombineFn( 10, lambda first, second: first[1] < second[1])) .without_defaults()) @@ -131,8 +129,8 @@ def __init__(self, sampling_threshold): def expand(self, pcoll): return (pcoll - | beam.ParDo('ExtractUserAndTimestamp', - ExtractUserAndTimestampDoFn()) + | 'ExtractUserAndTimestamp' >> beam.ParDo( + ExtractUserAndTimestampDoFn()) | beam.Filter(lambda x: (abs(hash(x)) <= MAX_TIMESTAMP * self.sampling_threshold)) | ComputeSessions() diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py index 926f1413d213..66cab7795218 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py @@ -31,10 +31,8 @@ def test_create_groups(self): p = TestPipeline() group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C']) - corpus_pcoll = p | beam.Create('create_corpus', - [{'f': 'corpus1'}, - {'f': 'corpus2'}, - {'f': 'corpus3'}]) + corpus_pcoll = p | 'create_corpus' >> beam.Create( + [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}]) words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}]) diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py index b3a969acfb91..7c77b9d3f94a 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters.py +++ b/sdks/python/apache_beam/examples/cookbook/filters.py @@ -53,8 +53,8 @@ def filter_cold_days(input_data, month_filter): projection_fields = ['year', 'month', 'day', 'mean_temp'] fields_of_interest = ( input_data - | beam.Map('projected', - lambda row: {f: row[f] for f in projection_fields})) + | 'projected' >> beam.Map( + lambda row: {f: row[f] for f in projection_fields})) # Compute the global mean temperature. global_mean = AsSingleton( diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index 6906ae467e54..55bdc5011f94 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -75,11 +75,12 @@ def run(argv=None, assert_results=None): def read_kv_textfile(label, textfile): return (p | 'read_%s' % label >> ReadFromText(textfile) - | beam.Map('backslash_%s' % label, - lambda x: re.sub(r'\\', r'\\\\', x)) - | beam.Map('escape_quotes_%s' % label, - lambda x: re.sub(r'"', r'\"', x)) - | beam.Map('split_%s' % label, lambda x: re.split(r'\t+', x, 1))) + | 'backslash_%s' % label >> beam.Map( + lambda x: re.sub(r'\\', r'\\\\', x)) + | 'escape_quotes_%s' % label >> beam.Map( + lambda x: re.sub(r'"', r'\"', x)) + | 'split_%s' % label >> beam.Map( + lambda x: re.split(r'\t+', x, 1))) # Read input databases. email = read_kv_textfile('email', known_args.input_email) diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index ff240ebf7ed0..631ab2dfd6fc 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -778,9 +778,8 @@ def close(self): kvs = p | beam.core.Create( 'CreateKVs', KVs) - kvs | beam.io.Write('WriteToSimpleKV', - SimpleKVSink('http://url_to_simple_kv/', - final_table_name)) + kvs | 'WriteToSimpleKV' >> beam.io.Write( + SimpleKVSink('http://url_to_simple_kv/', final_table_name)) # [END model_custom_sink_use_new_sink] p.run().wait_until_finish() diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index adfc33d74474..e34a64e01221 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -52,9 +52,9 @@ def run(argv=None): # Capitalize the characters in each line. transformed = (lines - | (beam.FlatMap('split', - lambda x: re.findall(r'[A-Za-z\']+', x)) - .with_output_types(unicode)) + | 'split' >> ( + beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + .with_output_types(unicode)) | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | beam.WindowInto(window.FixedWindows(15, 0)) | 'group' >> beam.GroupByKey() diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 9877ebf22214..8f55f42d147c 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -49,9 +49,8 @@ side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource() results = ( main_table - | beam.Map('process data', - lambda element, side_input: ..., - AsList(side_table))) + | 'process data' >> beam.Map( + lambda element, side_input: ..., AsList(side_table))) There is no difference in how main and side inputs are read. What makes the side_table a 'side input' is the AsList wrapper used when passing the table diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 8842369525c8..772e5e23500d 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -762,7 +762,7 @@ def test_write_text_bzip2_file_empty(self): def test_write_native(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned pipeline.run() @@ -775,7 +775,7 @@ def test_write_native(self): def test_write_native_auto_compression(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned fileio.NativeTextFileSink( self.path, file_name_suffix='.gz')) @@ -790,7 +790,7 @@ def test_write_native_auto_compression(self): def test_write_native_auto_compression_unsharded(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned fileio.NativeTextFileSink( self.path + '.gz', shard_name_template='')) diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 07ab4ccfdf33..ccb64ff2af7c 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -524,7 +524,7 @@ def test_write_display_data(self): def test_write_dataflow(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) pcoll | 'Write' >> WriteToText(self.path) # pylint: disable=expression-not-assigned pipeline.run() @@ -537,7 +537,7 @@ def test_write_dataflow(self): def test_write_dataflow_auto_compression(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz') # pylint: disable=expression-not-assigned pipeline.run() @@ -550,7 +550,7 @@ def test_write_dataflow_auto_compression(self): def test_write_dataflow_auto_compression_unsharded(self): pipeline = TestPipeline() - pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='') # pylint: disable=expression-not-assigned pipeline.run() diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 89a6fdc96400..f95b2954a3b2 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -205,8 +205,8 @@ def test_no_group_by_key_directly_after_bigquery(self): '--temp_location=/dev/null', '--no_auth=True' ])) - rows = p | beam.io.Read('read', - beam.io.BigQuerySource('dataset.faketable')) + rows = p | 'read' >> beam.io.Read( + beam.io.BigQuerySource('dataset.faketable')) with self.assertRaises(ValueError, msg=('Coder for the GroupByKey operation' '"GroupByKey" is not a key-value coder: ' diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 58382e4a97fa..13b963cf89ce 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -562,9 +562,9 @@ def expand(self, pcoll_dicts): @beam.ptransform_fn def SamplePTransform(pcoll): """Sample transform using the @ptransform_fn decorator.""" - map_transform = beam.Map('ToPairs', lambda v: (v, None)) - combine_transform = beam.CombinePerKey('Group', lambda vs: None) - keys_transform = beam.Keys('RemoveDuplicates') + map_transform = 'ToPairs' >> beam.Map(lambda v: (v, None)) + combine_transform = 'Group' >> beam.CombinePerKey(lambda vs: None) + keys_transform = 'RemoveDuplicates' >> beam.Keys() return pcoll | map_transform | combine_transform | keys_transform @@ -575,15 +575,15 @@ class CustomTransform(beam.PTransform): pardo = None def expand(self, pcoll): - self.pardo = beam.FlatMap('*do*', lambda x: [x + 1]) + self.pardo = '*do*' >> beam.FlatMap(lambda x: [x + 1]) return pcoll | self.pardo def test_chained_ptransforms(self): """Tests that chaining gets proper nesting.""" pipeline = TestPipeline() - map1 = beam.Map('map1', lambda x: (x, 1)) - gbk = beam.GroupByKey('gbk') - map2 = beam.Map('map2', lambda (x, ones): (x, sum(ones))) + map1 = 'map1' >> beam.Map(lambda x: (x, 1)) + gbk = 'gbk' >> beam.GroupByKey() + map2 = 'map2' >> beam.Map(lambda (x, ones): (x, sum(ones))) t = (map1 | gbk | map2) result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels) @@ -636,7 +636,7 @@ def test_combine_with_label(self): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = TestPipeline() pcoll = pipeline | 'start' >> beam.Create(vals) - combine = beam.CombineGlobally('*sum*', sum) + combine = '*sum*' >> beam.CombineGlobally(sum) result = pcoll | combine self.assertTrue('*sum*' in pipeline.applied_labels) assert_that(result, equal_to([sum(vals)])) From 0db60e40dc5a1e1983504e4535eab0140eb97c0e Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Wed, 18 Jan 2017 18:06:10 -0800 Subject: [PATCH 2/3] update labels in iobase --- sdks/python/apache_beam/io/iobase.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 8fb52383c017..93421a67b1c3 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -748,8 +748,8 @@ def __init__(self, sink): def expand(self, pcoll): do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None]) - init_result_coll = do_once | core.Map( - 'initialize_write', lambda _, sink: sink.initialize_write(), self.sink) + init_result_coll = do_once | 'initialize_write' >> core.Map( + lambda _, sink: sink.initialize_write(), self.sink) if getattr(self.sink, 'num_shards', 0): min_shards = self.sink.num_shards if min_shards == 1: @@ -759,18 +759,20 @@ def expand(self, pcoll): write_result_coll = (keyed_pcoll | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() - | core.Map('write_bundles', - _write_keyed_bundle, self.sink, - AsSingleton(init_result_coll))) + | 'write_bundles' >> core.Map( + _write_keyed_bundle, self.sink, + AsSingleton(init_result_coll))) else: min_shards = 1 - write_result_coll = (pcoll | core.ParDo('write_bundles', - _WriteBundleDoFn(), self.sink, - AsSingleton(init_result_coll)) - | core.Map('pair', lambda x: (None, x)) + write_result_coll = (pcoll + | 'write_bundles' >> + core.ParDo( + _WriteBundleDoFn(), self.sink, + AsSingleton(init_result_coll)) + | 'pair' >> core.Map(lambda x: (None, x)) | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() - | core.FlatMap('extract', lambda x: x[1])) + | 'extract' >> core.FlatMap(lambda x: x[1])) return do_once | core.FlatMap( 'finalize_write', _finalize_write, From d863e6863b1487aa884151c9a317076dab8facd6 Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Thu, 19 Jan 2017 12:28:48 -0800 Subject: [PATCH 3/3] Remove unneeded labels, and convert existing labels to UpperCamelCase. --- .../apache_beam/examples/complete/tfidf.py | 38 +- .../cookbook/bigquery_side_input_test.py | 13 +- .../apache_beam/examples/cookbook/filters.py | 12 +- .../examples/cookbook/mergecontacts.py | 16 +- .../examples/streaming_wordcount.py | 10 +- sdks/python/apache_beam/io/fileio_test.py | 12 +- sdks/python/apache_beam/io/iobase.py | 12 +- sdks/python/apache_beam/io/textio_test.py | 6 +- .../python/apache_beam/runners/runner_test.py | 25 +- .../apache_beam/transforms/ptransform_test.py | 513 +++++++++--------- 10 files changed, 325 insertions(+), 332 deletions(-) diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index c048cdd6f179..367e27543983 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -42,9 +42,9 @@ def read_documents(pipeline, uris): for uri in uris: pcolls.append( pipeline - | 'read: %s' % uri >> ReadFromText(uri) - | 'withkey: %s' % uri >> beam.Map(lambda v, uri: (uri, v), uri)) - return pcolls | 'flatten read pcolls' >> beam.Flatten() + | 'Read: %s' % uri >> ReadFromText(uri) + | 'WithKey: %s' % uri >> beam.Map(lambda v, uri: (uri, v), uri)) + return pcolls | 'FlattenReadPColls' >> beam.Flatten() class TfIdf(beam.PTransform): @@ -61,9 +61,9 @@ def expand(self, uri_to_content): # PCollection to use as side input. total_documents = ( uri_to_content - | 'get uris' >> beam.Keys() - | 'get unique uris' >> beam.RemoveDuplicates() - | ' count uris' >> beam.combiners.Count.Globally()) + | 'GetUris 1' >> beam.Keys() + | 'GetUniqueUris' >> beam.RemoveDuplicates() + | 'CountUris' >> beam.combiners.Count.Globally()) # Create a collection of pairs mapping a URI to each of the words # in the document associated with that that URI. @@ -73,35 +73,35 @@ def split_into_words((uri, line)): uri_to_words = ( uri_to_content - | 'split words' >> beam.FlatMap(split_into_words)) + | 'SplitWords' >> beam.FlatMap(split_into_words)) # Compute a mapping from each word to the total number of documents # in which it appears. word_to_doc_count = ( uri_to_words - | 'get unique words per doc' >> beam.RemoveDuplicates() - | 'get words' >> beam.Values() - | 'count docs per word' >> beam.combiners.Count.PerElement()) + | 'GetUniqueWordsPerDoc' >> beam.RemoveDuplicates() + | 'GetWords' >> beam.Values() + | 'CountDocsPerWord' >> beam.combiners.Count.PerElement()) # Compute a mapping from each URI to the total number of words in the # document associated with that URI. uri_to_word_total = ( uri_to_words - | ' get uris' >> beam.Keys() - | 'count words in doc' >> beam.combiners.Count.PerElement()) + | 'GetUris 2' >> beam.Keys() + | 'CountWordsInDoc' >> beam.combiners.Count.PerElement()) # Count, for each (URI, word) pair, the number of occurrences of that word # in the document associated with the URI. uri_and_word_to_count = ( uri_to_words - | 'count word-doc pairs' >> beam.combiners.Count.PerElement()) + | 'CountWord-DocPairs' >> beam.combiners.Count.PerElement()) # Adjust the above collection to a mapping from (URI, word) pairs to counts # into an isomorphic mapping from URI to (word, count) pairs, to prepare # for a join by the URI key. uri_to_word_and_count = ( uri_and_word_to_count - | 'shift keys' >> beam.Map( + | 'ShiftKeys' >> beam.Map( lambda ((uri, word), count): (uri, (word, count)))) # Perform a CoGroupByKey (a sort of pre-join) on the prepared @@ -118,7 +118,7 @@ def split_into_words((uri, line)): # ... ]} uri_to_word_and_count_and_total = ( {'word totals': uri_to_word_total, 'word counts': uri_to_word_and_count} - | 'cogroup by uri' >> beam.CoGroupByKey()) + | 'CoGroupByUri' >> beam.CoGroupByKey()) # Compute a mapping from each word to a (URI, term frequency) pair for each # URI. A word's term frequency for a document is simply the number of times @@ -134,7 +134,7 @@ def compute_term_frequency((uri, count_and_total)): word_to_uri_and_tf = ( uri_to_word_and_count_and_total - | 'compute term frequencies' >> beam.FlatMap(compute_term_frequency)) + | 'ComputeTermFrequencies' >> beam.FlatMap(compute_term_frequency)) # Compute a mapping from each word to its document frequency. # A word's document frequency in a corpus is the number of @@ -149,7 +149,7 @@ def compute_term_frequency((uri, count_and_total)): # DoFns in this way. word_to_df = ( word_to_doc_count - | 'compute doc frequencies' >> beam.Map( + | 'ComputeDocFrequencies' >> beam.Map( lambda (word, count), total: (word, float(count) / total), AsSingleton(total_documents))) @@ -157,7 +157,7 @@ def compute_term_frequency((uri, count_and_total)): # each keyed on the word. word_to_uri_and_tf_and_df = ( {'tf': word_to_uri_and_tf, 'df': word_to_df} - | 'cogroup words by tf-df' >> beam.CoGroupByKey()) + | 'CoGroupWordsByTf-df' >> beam.CoGroupByKey()) # Compute a mapping from each word to a (URI, TF-IDF) score for each URI. # There are a variety of definitions of TF-IDF @@ -172,7 +172,7 @@ def compute_tf_idf((word, tf_and_df)): word_to_uri_and_tfidf = ( word_to_uri_and_tf_and_df - | 'compute tf-idf' >> beam.FlatMap(compute_tf_idf)) + | 'ComputeTf-idf' >> beam.FlatMap(compute_tf_idf)) return word_to_uri_and_tfidf diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py index 66cab7795218..586997648145 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py @@ -30,14 +30,13 @@ class BigQuerySideInputTest(unittest.TestCase): def test_create_groups(self): p = TestPipeline() - group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C']) - corpus_pcoll = p | 'create_corpus' >> beam.Create( + group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C']) + corpus_pcoll = p | 'CreateCorpus' >> beam.Create( [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}]) - words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'}, - {'f': 'word2'}, - {'f': 'word3'}]) - ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1']) - ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1']) + words_pcoll = p | 'CreateWords' >> beam.Create( + [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}]) + ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1']) + ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1']) groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll, words_pcoll, ignore_corpus_pcoll, diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py index 7c77b9d3f94a..d13d823972bf 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters.py +++ b/sdks/python/apache_beam/examples/cookbook/filters.py @@ -53,21 +53,21 @@ def filter_cold_days(input_data, month_filter): projection_fields = ['year', 'month', 'day', 'mean_temp'] fields_of_interest = ( input_data - | 'projected' >> beam.Map( + | 'Projected' >> beam.Map( lambda row: {f: row[f] for f in projection_fields})) # Compute the global mean temperature. global_mean = AsSingleton( fields_of_interest - | 'extract mean' >> beam.Map(lambda row: row['mean_temp']) - | 'global mean' >> beam.combiners.Mean.Globally()) + | 'ExtractMean' >> beam.Map(lambda row: row['mean_temp']) + | 'GlobalMean' >> beam.combiners.Mean.Globally()) # Filter to the rows representing days in the month of interest # in which the mean daily temperature is below the global mean. return ( fields_of_interest - | 'desired month' >> beam.Filter(lambda row: row['month'] == month_filter) - | 'below mean' >> beam.Filter( + | 'DesiredMonth' >> beam.Filter(lambda row: row['month'] == month_filter) + | 'BelowMean' >> beam.Filter( lambda row, mean: row['mean_temp'] < mean, global_mean)) @@ -92,7 +92,7 @@ def run(argv=None): # pylint: disable=expression-not-assigned (filter_cold_days(input_data, known_args.month_filter) - | 'save to BQ' >> beam.io.Write(beam.io.BigQuerySink( + | 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink( known_args.output, schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index 55bdc5011f94..c880a9a0c2f0 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -74,12 +74,12 @@ def run(argv=None, assert_results=None): # quotes/backslashes, and convert it a PCollection of (key, value) pairs. def read_kv_textfile(label, textfile): return (p - | 'read_%s' % label >> ReadFromText(textfile) - | 'backslash_%s' % label >> beam.Map( + | 'Read: %s' % label >> ReadFromText(textfile) + | 'Backslash: %s' % label >> beam.Map( lambda x: re.sub(r'\\', r'\\\\', x)) - | 'escape_quotes_%s' % label >> beam.Map( + | 'EscapeQuotes: %s' % label >> beam.Map( lambda x: re.sub(r'"', r'\"', x)) - | 'split_%s' % label >> beam.Map( + | 'Split: %s' % label >> beam.Map( lambda x: re.split(r'\t+', x, 1))) # Read input databases. @@ -107,13 +107,13 @@ def read_kv_textfile(label, textfile): nomads = grouped | beam.Filter( # People without addresses. lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None)) - num_luddites = luddites | 'luddites' >> beam.combiners.Count.Globally() - num_writers = writers | 'writers' >> beam.combiners.Count.Globally() - num_nomads = nomads | 'nomads' >> beam.combiners.Count.Globally() + num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally() + num_writers = writers | 'Writers' >> beam.combiners.Count.Globally() + num_nomads = nomads | 'Nomads' >> beam.combiners.Count.Globally() # Write tab-delimited output. # pylint: disable=expression-not-assigned - tsv_lines | 'write_tsv' >> WriteToText(known_args.output_tsv) + tsv_lines | 'WriteTsv' >> WriteToText(known_args.output_tsv) # TODO(silviuc): Move the assert_results logic to the unit test. if assert_results is not None: diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index e34a64e01221..7fb2c8129121 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -52,14 +52,14 @@ def run(argv=None): # Capitalize the characters in each line. transformed = (lines - | 'split' >> ( + | 'Split' >> ( beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) .with_output_types(unicode)) - | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) + | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | beam.WindowInto(window.FixedWindows(15, 0)) - | 'group' >> beam.GroupByKey() - | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))) - | 'format' >> beam.Map(lambda tup: '%s: %d' % tup)) + | 'Group' >> beam.GroupByKey() + | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones))) + | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup)) # Write to PubSub. # pylint: disable=expression-not-assigned diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 772e5e23500d..f75bc5dbb061 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -762,8 +762,8 @@ def test_write_text_bzip2_file_empty(self): def test_write_native(self): pipeline = TestPipeline() - pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) - pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned + pcoll = pipeline | beam.core.Create(self.lines) + pcoll | beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned pipeline.run() read_result = [] @@ -775,8 +775,8 @@ def test_write_native(self): def test_write_native_auto_compression(self): pipeline = TestPipeline() - pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) - pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned + pcoll = pipeline | beam.core.Create(self.lines) + pcoll | beam.Write( # pylint: disable=expression-not-assigned fileio.NativeTextFileSink( self.path, file_name_suffix='.gz')) pipeline.run() @@ -790,8 +790,8 @@ def test_write_native_auto_compression(self): def test_write_native_auto_compression_unsharded(self): pipeline = TestPipeline() - pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) - pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned + pcoll = pipeline | beam.core.Create(self.lines) + pcoll | beam.Write( # pylint: disable=expression-not-assigned fileio.NativeTextFileSink( self.path + '.gz', shard_name_template='')) pipeline.run() diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 93421a67b1c3..12af3b6655eb 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -727,7 +727,7 @@ def expand(self, pcoll): from apache_beam.runners.dataflow.native_io import iobase as dataflow_io if isinstance(self.sink, dataflow_io.NativeSink): # A native sink - return pcoll | 'native_write' >> dataflow_io._NativeWrite(self.sink) + return pcoll | 'NativeWrite' >> dataflow_io._NativeWrite(self.sink) elif isinstance(self.sink, Sink): # A custom sink return pcoll | WriteImpl(self.sink) @@ -748,7 +748,7 @@ def __init__(self, sink): def expand(self, pcoll): do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None]) - init_result_coll = do_once | 'initialize_write' >> core.Map( + init_result_coll = do_once | 'InitializeWrite' >> core.Map( lambda _, sink: sink.initialize_write(), self.sink) if getattr(self.sink, 'num_shards', 0): min_shards = self.sink.num_shards @@ -759,20 +759,20 @@ def expand(self, pcoll): write_result_coll = (keyed_pcoll | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() - | 'write_bundles' >> core.Map( + | 'WriteBundles' >> core.Map( _write_keyed_bundle, self.sink, AsSingleton(init_result_coll))) else: min_shards = 1 write_result_coll = (pcoll - | 'write_bundles' >> + | 'WriteBundles' >> core.ParDo( _WriteBundleDoFn(), self.sink, AsSingleton(init_result_coll)) - | 'pair' >> core.Map(lambda x: (None, x)) + | 'Pair' >> core.Map(lambda x: (None, x)) | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() - | 'extract' >> core.FlatMap(lambda x: x[1])) + | 'Extract' >> core.FlatMap(lambda x: x[1])) return do_once | core.FlatMap( 'finalize_write', _finalize_write, diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index ccb64ff2af7c..4b85584e3ab6 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -524,7 +524,7 @@ def test_write_display_data(self): def test_write_dataflow(self): pipeline = TestPipeline() - pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) + pcoll = pipeline | beam.core.Create(self.lines) pcoll | 'Write' >> WriteToText(self.path) # pylint: disable=expression-not-assigned pipeline.run() @@ -537,7 +537,7 @@ def test_write_dataflow(self): def test_write_dataflow_auto_compression(self): pipeline = TestPipeline() - pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) + pcoll = pipeline | beam.core.Create(self.lines) pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz') # pylint: disable=expression-not-assigned pipeline.run() @@ -550,7 +550,7 @@ def test_write_dataflow_auto_compression(self): def test_write_dataflow_auto_compression_unsharded(self): pipeline = TestPipeline() - pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) + pcoll = pipeline | beam.core.Create(self.lines) pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='') # pylint: disable=expression-not-assigned pipeline.run() diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index f95b2954a3b2..f5225903a067 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -84,9 +84,9 @@ def test_remote_runner_translation(self): p = Pipeline(remote_runner, options=PipelineOptions(self.default_properties)) - (p | 'create' >> ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned - | 'do' >> ptransform.FlatMap(lambda x: [(x, x)]) - | 'gbk' >> ptransform.GroupByKey()) + (p | ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned + | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) + | ptransform.GroupByKey()) remote_runner.job = apiclient.Job(p.options) super(DataflowRunner, remote_runner).run(p) @@ -118,8 +118,8 @@ def process(self, context): now = datetime.now() # pylint: disable=expression-not-assigned - (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5]) - | 'do' >> SpecialParDo(SpecialDoFn(), now)) + (p | ptransform.Create([1, 2, 3, 4, 5]) + | 'Do' >> SpecialParDo(SpecialDoFn(), now)) remote_runner.job = apiclient.Job(p.options) super(DataflowRunner, remote_runner).run(p) @@ -166,8 +166,8 @@ def process(self, context): p = Pipeline(runner, options=PipelineOptions(self.default_properties)) # pylint: disable=expression-not-assigned - (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5]) - | 'do' >> beam.ParDo(MyDoFn())) + (p | ptransform.Create([1, 2, 3, 4, 5]) + | 'Do' >> beam.ParDo(MyDoFn())) result = p.run() result.wait_until_finish() metrics = result.metrics().query() @@ -178,19 +178,19 @@ def process(self, context): metrics['counters'], hc.contains_inanyorder( MetricResult( - MetricKey('do', MetricName(namespace, 'elements')), + MetricKey('Do', MetricName(namespace, 'elements')), 5, 5), MetricResult( - MetricKey('do', MetricName(namespace, 'bundles')), + MetricKey('Do', MetricName(namespace, 'bundles')), 1, 1), MetricResult( - MetricKey('do', MetricName(namespace, 'finished_bundles')), + MetricKey('Do', MetricName(namespace, 'finished_bundles')), 1, 1))) hc.assert_that( metrics['distributions'], hc.contains_inanyorder( MetricResult( - MetricKey('do', MetricName(namespace, 'element_dist')), + MetricKey('Do', MetricName(namespace, 'element_dist')), DistributionResult(DistributionData(15, 5, 1, 5)), DistributionResult(DistributionData(15, 5, 1, 5))))) @@ -205,8 +205,7 @@ def test_no_group_by_key_directly_after_bigquery(self): '--temp_location=/dev/null', '--no_auth=True' ])) - rows = p | 'read' >> beam.io.Read( - beam.io.BigQuerySource('dataset.faketable')) + rows = p | beam.io.Read(beam.io.BigQuerySource('dataset.faketable')) with self.assertRaises(ValueError, msg=('Coder for the GroupByKey operation' '"GroupByKey" is not a key-value coder: ' diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 13b963cf89ce..827bc8346ff4 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -54,8 +54,8 @@ def test_str(self): str(PTransform())) pa = TestPipeline() - res = pa | 'a_label' >> beam.Create([1, 2]) - self.assertEqual('AppliedPTransform(a_label, Create)', + res = pa | 'ALabel' >> beam.Create([1, 2]) + self.assertEqual('AppliedPTransform(ALabel, Create)', str(res.producer)) pc = TestPipeline() @@ -111,8 +111,8 @@ def process(self, context, addon): return [context.element + addon] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) - result = pcoll | 'do' >> beam.ParDo(AddNDoFn(), 10) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) + result = pcoll | 'Do' >> beam.ParDo(AddNDoFn(), 10) assert_that(result, equal_to([11, 12, 13])) pipeline.run() @@ -123,39 +123,39 @@ def process(self, context): pass pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) with self.assertRaises(ValueError): - pcoll | 'do' >> beam.ParDo(MyDoFn) # Note the lack of ()'s + pcoll | 'Do' >> beam.ParDo(MyDoFn) # Note the lack of ()'s def test_do_with_callable(self): pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) - result = pcoll | 'do' >> beam.FlatMap(lambda x, addon: [x + addon], 10) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) + result = pcoll | 'Do' >> beam.FlatMap(lambda x, addon: [x + addon], 10) assert_that(result, equal_to([11, 12, 13])) pipeline.run() def test_do_with_side_input_as_arg(self): pipeline = TestPipeline() - side = pipeline | 'side' >> beam.Create([10]) - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) - result = pcoll | beam.FlatMap( - 'do', lambda x, addon: [x + addon], pvalue.AsSingleton(side)) + side = pipeline | 'Side' >> beam.Create([10]) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) + result = pcoll | 'Do' >> beam.FlatMap( + lambda x, addon: [x + addon], pvalue.AsSingleton(side)) assert_that(result, equal_to([11, 12, 13])) pipeline.run() def test_do_with_side_input_as_keyword_arg(self): pipeline = TestPipeline() - side = pipeline | 'side' >> beam.Create([10]) - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) - result = pcoll | beam.FlatMap( - 'do', lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side)) + side = pipeline | 'Side' >> beam.Create([10]) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) + result = pcoll | 'Do' >> beam.FlatMap( + lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side)) assert_that(result, equal_to([11, 12, 13])) pipeline.run() def test_do_with_do_fn_returning_string_raises_warning(self): pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3']) - pcoll | 'do' >> beam.FlatMap(lambda x: x + '1') + pcoll = pipeline | 'Start' >> beam.Create(['2', '9', '3']) + pcoll | 'Do' >> beam.FlatMap(lambda x: x + '1') # Since the DoFn directly returns a string we should get an error warning # us. @@ -168,8 +168,8 @@ def test_do_with_do_fn_returning_string_raises_warning(self): def test_do_with_do_fn_returning_dict_raises_warning(self): pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3']) - pcoll | 'do' >> beam.FlatMap(lambda x: {x: '1'}) + pcoll = pipeline | 'Start' >> beam.Create(['2', '9', '3']) + pcoll | 'Do' >> beam.FlatMap(lambda x: {x: '1'}) # Since the DoFn directly returns a dict we should get an error warning # us. @@ -182,9 +182,9 @@ def test_do_with_do_fn_returning_dict_raises_warning(self): def test_do_with_side_outputs_maintains_unique_name(self): pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) - r1 = pcoll | 'a' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m') - r2 = pcoll | 'b' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m') + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) + r1 = pcoll | 'A' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m') + r2 = pcoll | 'B' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m') assert_that(r1.m, equal_to([2, 3, 4]), label='r1') assert_that(r2.m, equal_to([3, 4, 5]), label='r2') pipeline.run() @@ -195,8 +195,8 @@ def test_do_requires_do_fn_returning_iterable(self): def incorrect_par_do_fn(x): return x + 5 pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([2, 9, 3]) - pcoll | 'do' >> beam.FlatMap(incorrect_par_do_fn) + pcoll = pipeline | 'Start' >> beam.Create([2, 9, 3]) + pcoll | 'Do' >> beam.FlatMap(incorrect_par_do_fn) # It's a requirement that all user-defined functions to a ParDo return # an iterable. with self.assertRaises(typehints.TypeCheckError) as cm: @@ -216,8 +216,8 @@ def process(self, c): def finish_bundle(self, c): yield 'finish' pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) - result = pcoll | 'do' >> beam.ParDo(MyDoFn()) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) + result = pcoll | 'Do' >> beam.ParDo(MyDoFn()) # May have many bundles, but each has a start and finish. def matcher(): @@ -231,9 +231,8 @@ def match(actual): def test_filter(self): pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3, 4]) - result = pcoll | beam.Filter( - 'filter', lambda x: x % 2 == 0) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3, 4]) + result = pcoll | 'Filter' >> beam.Filter(lambda x: x % 2 == 0) assert_that(result, equal_to([2, 4])) pipeline.run() @@ -257,15 +256,15 @@ def extract_output(self, (sum_, count)): def test_combine_with_combine_fn(self): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(vals) - result = pcoll | 'mean' >> beam.CombineGlobally(self._MeanCombineFn()) + pcoll = pipeline | 'Start' >> beam.Create(vals) + result = pcoll | 'Mean' >> beam.CombineGlobally(self._MeanCombineFn()) assert_that(result, equal_to([sum(vals) / len(vals)])) pipeline.run() def test_combine_with_callable(self): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(vals) + pcoll = pipeline | 'Start' >> beam.Create(vals) result = pcoll | beam.CombineGlobally(sum) assert_that(result, equal_to([sum(vals)])) pipeline.run() @@ -273,10 +272,9 @@ def test_combine_with_callable(self): def test_combine_with_side_input_as_arg(self): values = [1, 2, 3, 4, 5, 6, 7] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(values) - divisor = pipeline | 'divisor' >> beam.Create([2]) - result = pcoll | beam.CombineGlobally( - 'max', + pcoll = pipeline | 'Start' >> beam.Create(values) + divisor = pipeline | 'Divisor' >> beam.Create([2]) + result = pcoll | 'Max' >> beam.CombineGlobally( # Multiples of divisor only. lambda vals, d: max(v for v in vals if v % d == 0), pvalue.AsSingleton(divisor)).without_defaults() @@ -288,9 +286,9 @@ def test_combine_per_key_with_combine_fn(self): vals_1 = [1, 2, 3, 4, 5, 6, 7] vals_2 = [2, 4, 6, 8, 10, 12, 14] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + + pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) - result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn()) + result = pcoll | 'Mean' >> beam.CombinePerKey(self._MeanCombineFn()) assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)), ('b', sum(vals_2) / len(vals_2))])) pipeline.run() @@ -299,7 +297,7 @@ def test_combine_per_key_with_callable(self): vals_1 = [1, 2, 3, 4, 5, 6, 7] vals_2 = [2, 4, 6, 8, 10, 12, 14] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + + pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) result = pcoll | beam.CombinePerKey(sum) assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))])) @@ -309,9 +307,9 @@ def test_combine_per_key_with_side_input_as_arg(self): vals_1 = [1, 2, 3, 4, 5, 6, 7] vals_2 = [2, 4, 6, 8, 10, 12, 14] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + + pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) - divisor = pipeline | 'divisor' >> beam.Create([2]) + divisor = pipeline | 'Divisor' >> beam.Create([2]) result = pcoll | beam.CombinePerKey( lambda vals, d: max(v for v in vals if v % d == 0), pvalue.AsSingleton(divisor)) # Multiples of divisor only. @@ -324,7 +322,7 @@ def test_group_by_key(self): pipeline = TestPipeline() pcoll = pipeline | beam.Create( 'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]) - result = pcoll | 'group' >> beam.GroupByKey() + result = pcoll | 'Group' >> beam.GroupByKey() assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])])) pipeline.run() @@ -336,9 +334,9 @@ def partition_for(self, context, num_partitions, offset): return (context.element % 3) + offset pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) + pcoll = pipeline | 'Start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) # Attempt nominal partition operation. - partitions = pcoll | 'part1' >> beam.Partition(SomePartitionFn(), 4, 1) + partitions = pcoll | 'Part 1' >> beam.Partition(SomePartitionFn(), 4, 1) assert_that(partitions[0], equal_to([])) assert_that(partitions[1], equal_to([0, 3, 6]), label='p1') assert_that(partitions[2], equal_to([1, 4, 7]), label='p2') @@ -348,14 +346,14 @@ def partition_for(self, context, num_partitions, offset): # Check that a bad partition label will yield an error. For the # DirectRunner, this error manifests as an exception. pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) - partitions = pcoll | 'part2' >> beam.Partition(SomePartitionFn(), 4, 10000) + pcoll = pipeline | 'Start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) + partitions = pcoll | 'Part 2' >> beam.Partition(SomePartitionFn(), 4, 10000) with self.assertRaises(ValueError): pipeline.run() def test_partition_with_callable(self): pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) + pcoll = pipeline | 'Start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) partitions = ( pcoll | beam.Partition( 'part', @@ -380,48 +378,47 @@ def test_partition_followed_by_flatten_and_groupbykey(self): def test_flatten_pcollections(self): pipeline = TestPipeline() - pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3]) - pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7]) - result = (pcoll_1, pcoll_2) | 'flatten' >> beam.Flatten() + pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3]) + pcoll_2 = pipeline | 'Start 2' >> beam.Create([4, 5, 6, 7]) + result = (pcoll_1, pcoll_2) | 'Flatten' >> beam.Flatten() assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7])) pipeline.run() def test_flatten_no_pcollections(self): pipeline = TestPipeline() with self.assertRaises(ValueError): - () | 'pipeline arg missing' >> beam.Flatten() - result = () | 'empty' >> beam.Flatten(pipeline=pipeline) + () | 'PipelineArgMissing' >> beam.Flatten() + result = () | 'Empty' >> beam.Flatten(pipeline=pipeline) assert_that(result, equal_to([])) pipeline.run() def test_flatten_pcollections_in_iterable(self): pipeline = TestPipeline() - pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3]) - pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7]) - result = ([pcoll for pcoll in (pcoll_1, pcoll_2)] - | 'flatten' >> beam.Flatten()) + pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3]) + pcoll_2 = pipeline | 'Start 2' >> beam.Create([4, 5, 6, 7]) + result = [pcoll for pcoll in (pcoll_1, pcoll_2)] | beam.Flatten() assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7])) pipeline.run() def test_flatten_input_type_must_be_iterable(self): # Inputs to flatten *must* be an iterable. with self.assertRaises(ValueError): - 4 | 'flatten' >> beam.Flatten() + 4 | beam.Flatten() def test_flatten_input_type_must_be_iterable_of_pcolls(self): # Inputs to flatten *must* be an iterable of PCollections. with self.assertRaises(TypeError): - {'l': 'test'} | 'flatten' >> beam.Flatten() + {'l': 'test'} | beam.Flatten() with self.assertRaises(TypeError): - set([1, 2, 3]) | 'flatten' >> beam.Flatten() + set([1, 2, 3]) | beam.Flatten() def test_co_group_by_key_on_list(self): pipeline = TestPipeline() - pcoll_1 = pipeline | beam.Create( - 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) - pcoll_2 = pipeline | beam.Create( - 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) - result = (pcoll_1, pcoll_2) | 'cgbk' >> beam.CoGroupByKey() + pcoll_1 = pipeline | 'Start 1' >> beam.Create( + [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) + pcoll_2 = pipeline | 'Start 2' >> beam.Create( + [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) + result = (pcoll_1, pcoll_2) | beam.CoGroupByKey() assert_that(result, equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])), ('c', ([4], [7, 8]))])) @@ -429,12 +426,11 @@ def test_co_group_by_key_on_list(self): def test_co_group_by_key_on_iterable(self): pipeline = TestPipeline() - pcoll_1 = pipeline | beam.Create( - 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) - pcoll_2 = pipeline | beam.Create( - 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) - result = ([pc for pc in (pcoll_1, pcoll_2)] - | 'cgbk' >> beam.CoGroupByKey()) + pcoll_1 = pipeline | 'Start 1' >> beam.Create( + [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) + pcoll_2 = pipeline | 'Start 2' >> beam.Create( + [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) + result = [pc for pc in (pcoll_1, pcoll_2)] | beam.CoGroupByKey() assert_that(result, equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])), ('c', ([4], [7, 8]))])) @@ -442,11 +438,11 @@ def test_co_group_by_key_on_iterable(self): def test_co_group_by_key_on_dict(self): pipeline = TestPipeline() - pcoll_1 = pipeline | beam.Create( - 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) - pcoll_2 = pipeline | beam.Create( - 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) - result = {'X': pcoll_1, 'Y': pcoll_2} | 'cgbk' >> beam.CoGroupByKey() + pcoll_1 = pipeline | 'Start 1' >> beam.Create( + [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) + pcoll_2 = pipeline | 'Start 2' >> beam.Create( + [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) + result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey() assert_that(result, equal_to([('a', {'X': [1, 2], 'Y': [5, 6]}), ('b', {'X': [3], 'Y': []}), ('c', {'X': [4], 'Y': [7, 8]})])) @@ -478,8 +474,8 @@ def test_group_by_key_only_input_must_be_kv_pairs(self): def test_keys_and_values(self): pipeline = TestPipeline() - pcoll = pipeline | beam.Create( - 'start', [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)]) + pcoll = pipeline | 'Start' >> beam.Create( + [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)]) keys = pcoll.apply('keys', beam.Keys()) vals = pcoll.apply('vals', beam.Values()) assert_that(keys, equal_to([1, 2, 2, 3, 3, 3]), label='assert:keys') @@ -488,16 +484,16 @@ def test_keys_and_values(self): def test_kv_swap(self): pipeline = TestPipeline() - pcoll = pipeline | beam.Create( - 'start', [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)]) + pcoll = pipeline | 'Start' >> beam.Create( + [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)]) result = pcoll.apply('swap', beam.KvSwap()) assert_that(result, equal_to([(1, 7), (2, 1), (2, 3), (2, 5), (3, 6)])) pipeline.run() def test_remove_duplicates(self): pipeline = TestPipeline() - pcoll = pipeline | beam.Create( - 'start', [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel']) + pcoll = pipeline | 'Start' >> beam.Create( + [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel']) result = pcoll.apply('nodupes', beam.RemoveDuplicates()) assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel'])) pipeline.run() @@ -507,15 +503,15 @@ def test_chained_ptransforms(self): t = (beam.Map(lambda x: (x, 1)) | beam.GroupByKey() | beam.Map(lambda (x, ones): (x, sum(ones)))) - result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t + result = pipeline | 'Start' >> beam.Create(['a', 'a', 'b']) | t assert_that(result, equal_to([('a', 2), ('b', 1)])) pipeline.run() def test_apply_to_list(self): self.assertItemsEqual( - [1, 2, 3], [0, 1, 2] | 'add_one' >> beam.Map(lambda x: x + 1)) + [1, 2, 3], [0, 1, 2] | 'AddOne' >> beam.Map(lambda x: x + 1)) self.assertItemsEqual([1], - [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2)) + [0, 1, 2] | 'Odd' >> beam.Filter(lambda x: x % 2)) self.assertItemsEqual([1, 2, 100, 3], ([1, 2, 3], [100]) | beam.Flatten()) join_input = ([('k', 'a')], @@ -575,47 +571,47 @@ class CustomTransform(beam.PTransform): pardo = None def expand(self, pcoll): - self.pardo = '*do*' >> beam.FlatMap(lambda x: [x + 1]) + self.pardo = '*Do*' >> beam.FlatMap(lambda x: [x + 1]) return pcoll | self.pardo def test_chained_ptransforms(self): """Tests that chaining gets proper nesting.""" pipeline = TestPipeline() - map1 = 'map1' >> beam.Map(lambda x: (x, 1)) - gbk = 'gbk' >> beam.GroupByKey() - map2 = 'map2' >> beam.Map(lambda (x, ones): (x, sum(ones))) + map1 = 'Map1' >> beam.Map(lambda x: (x, 1)) + gbk = 'Gbk' >> beam.GroupByKey() + map2 = 'Map2' >> beam.Map(lambda (x, ones): (x, sum(ones))) t = (map1 | gbk | map2) - result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t - self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels) - self.assertTrue('map1|gbk|map2/gbk' in pipeline.applied_labels) - self.assertTrue('map1|gbk|map2/map2' in pipeline.applied_labels) + result = pipeline | 'Start' >> beam.Create(['a', 'a', 'b']) | t + self.assertTrue('Map1|Gbk|Map2/Map1' in pipeline.applied_labels) + self.assertTrue('Map1|Gbk|Map2/Gbk' in pipeline.applied_labels) + self.assertTrue('Map1|Gbk|Map2/Map2' in pipeline.applied_labels) assert_that(result, equal_to([('a', 2), ('b', 1)])) pipeline.run() def test_apply_custom_transform_without_label(self): pipeline = TestPipeline() - pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3]) + pcoll = pipeline | 'PColl' >> beam.Create([1, 2, 3]) custom = PTransformLabelsTest.CustomTransform() result = pipeline.apply(custom, pcoll) self.assertTrue('CustomTransform' in pipeline.applied_labels) - self.assertTrue('CustomTransform/*do*' in pipeline.applied_labels) + self.assertTrue('CustomTransform/*Do*' in pipeline.applied_labels) assert_that(result, equal_to([2, 3, 4])) pipeline.run() def test_apply_custom_transform_with_label(self): pipeline = TestPipeline() - pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3]) - custom = PTransformLabelsTest.CustomTransform('*custom*') + pcoll = pipeline | 'PColl' >> beam.Create([1, 2, 3]) + custom = PTransformLabelsTest.CustomTransform('*Custom*') result = pipeline.apply(custom, pcoll) - self.assertTrue('*custom*' in pipeline.applied_labels) - self.assertTrue('*custom*/*do*' in pipeline.applied_labels) + self.assertTrue('*Custom*' in pipeline.applied_labels) + self.assertTrue('*Custom*/*Do*' in pipeline.applied_labels) assert_that(result, equal_to([2, 3, 4])) pipeline.run() def test_combine_without_label(self): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(vals) + pcoll = pipeline | 'Start' >> beam.Create(vals) combine = beam.CombineGlobally(sum) result = pcoll | combine self.assertTrue('CombineGlobally(sum)' in pipeline.applied_labels) @@ -624,28 +620,28 @@ def test_combine_without_label(self): def test_apply_ptransform_using_decorator(self): pipeline = TestPipeline() - pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3]) - sample = SamplePTransform('*sample*') + pcoll = pipeline | 'PColl' >> beam.Create([1, 2, 3]) + sample = SamplePTransform('*Sample*') _ = pcoll | sample - self.assertTrue('*sample*' in pipeline.applied_labels) - self.assertTrue('*sample*/ToPairs' in pipeline.applied_labels) - self.assertTrue('*sample*/Group' in pipeline.applied_labels) - self.assertTrue('*sample*/RemoveDuplicates' in pipeline.applied_labels) + self.assertTrue('*Sample*' in pipeline.applied_labels) + self.assertTrue('*Sample*/ToPairs' in pipeline.applied_labels) + self.assertTrue('*Sample*/Group' in pipeline.applied_labels) + self.assertTrue('*Sample*/RemoveDuplicates' in pipeline.applied_labels) def test_combine_with_label(self): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(vals) - combine = '*sum*' >> beam.CombineGlobally(sum) + pcoll = pipeline | 'Start' >> beam.Create(vals) + combine = '*Sum*' >> beam.CombineGlobally(sum) result = pcoll | combine - self.assertTrue('*sum*' in pipeline.applied_labels) + self.assertTrue('*Sum*' in pipeline.applied_labels) assert_that(result, equal_to([sum(vals)])) pipeline.run() def check_label(self, ptransform, expected_label): pipeline = TestPipeline() - pipeline | 'start' >> beam.Create([('a', 1)]) | ptransform - actual_label = sorted(pipeline.applied_labels - {'start'})[0] + pipeline | 'Start' >> beam.Create([('a', 1)]) | ptransform + actual_label = sorted(pipeline.applied_labels - {'Start'})[0] self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label)) def test_default_labels(self): @@ -737,8 +733,8 @@ def process(self, context, five): return [context.element + five] d = (self.p - | 't' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'add' >> beam.ParDo(AddWithFive(), 5)) + | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Add' >> beam.ParDo(AddWithFive(), 5)) assert_that(d, equal_to([6, 7, 8])) self.p.run() @@ -752,10 +748,10 @@ def process(self, context, prefix): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 't' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello')) + | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello')) - self.assertEqual("Type hint violation for 'upper': " + self.assertEqual("Type hint violation for 'Upper': " "requires but got for context", e.exception.message) @@ -769,8 +765,8 @@ def process(self, context, num): return [context.element + num] d = (self.p - | 't' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'add' >> beam.ParDo(AddWithNum(), 5)) + | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Add' >> beam.ParDo(AddWithNum(), 5)) assert_that(d, equal_to([6, 7, 8])) self.p.run() @@ -786,11 +782,11 @@ def process(self, context, num): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 't' >> beam.Create(['1', '2', '3']).with_output_types(str) - | 'add' >> beam.ParDo(AddWithNum(), 5)) + | 'T' >> beam.Create(['1', '2', '3']).with_output_types(str) + | 'Add' >> beam.ParDo(AddWithNum(), 5)) self.p.run() - self.assertEqual("Type hint violation for 'add': " + self.assertEqual("Type hint violation for 'Add': " "requires but got for context", e.exception.message) @@ -804,10 +800,10 @@ def int_to_str(a): # will receive a str instead, which should result in a raised exception. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 's' >> beam.Create(['b', 'a', 'r']).with_output_types(str) - | 'to str' >> beam.FlatMap(int_to_str)) + | 'S' >> beam.Create(['b', 'a', 'r']).with_output_types(str) + | 'ToStr' >> beam.FlatMap(int_to_str)) - self.assertEqual("Type hint violation for 'to str': " + self.assertEqual("Type hint violation for 'ToStr': " "requires but got for a", e.exception.message) @@ -819,8 +815,8 @@ def to_all_upper_case(a): # If this type-checks than no error should be raised. d = (self.p - | 't' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | 'case' >> beam.FlatMap(to_all_upper_case)) + | 'T' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | 'Case' >> beam.FlatMap(to_all_upper_case)) assert_that(d, equal_to(['T', 'E', 'S', 'T'])) self.p.run() @@ -833,23 +829,23 @@ def test_pardo_does_not_type_check_using_type_hint_methods(self): # expecting pcoll's of type str instead. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | ('score' >> beam.FlatMap(lambda x: [1] if x == 't' else [2]) + | 'S' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | ('Score' >> beam.FlatMap(lambda x: [1] if x == 't' else [2]) .with_input_types(str).with_output_types(int)) - | ('upper' >> beam.FlatMap(lambda x: [x.upper()]) + | ('Upper' >> beam.FlatMap(lambda x: [x.upper()]) .with_input_types(str).with_output_types(str))) - self.assertEqual("Type hint violation for 'upper': " + self.assertEqual("Type hint violation for 'Upper': " "requires but got for x", e.exception.message) def test_pardo_properly_type_checks_using_type_hint_methods(self): # Pipeline should be created successfully without an error d = (self.p - | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | 'dup' >> beam.FlatMap(lambda x: [x + x]) + | 'S' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | 'Dup' >> beam.FlatMap(lambda x: [x + x]) .with_input_types(str).with_output_types(str) - | 'upper' >> beam.FlatMap(lambda x: [x.upper()]) + | 'Upper' >> beam.FlatMap(lambda x: [x.upper()]) .with_input_types(str).with_output_types(str)) assert_that(d, equal_to(['TT', 'EE', 'SS', 'TT'])) @@ -860,19 +856,19 @@ def test_map_does_not_type_check_using_type_hints_methods(self): # int's, while Map is expecting one of str. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int) - | 'upper' >> beam.Map(lambda x: x.upper()) + | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int) + | 'Upper' >> beam.Map(lambda x: x.upper()) .with_input_types(str).with_output_types(str)) - self.assertEqual("Type hint violation for 'upper': " + self.assertEqual("Type hint violation for 'Upper': " "requires but got for x", e.exception.message) def test_map_properly_type_checks_using_type_hints_methods(self): # No error should be raised if this type-checks properly. d = (self.p - | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int) - | 'to_str' >> beam.Map(lambda x: str(x)) + | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int) + | 'ToStr' >> beam.Map(lambda x: str(x)) .with_input_types(int).with_output_types(str)) assert_that(d, equal_to(['1', '2', '3', '4'])) self.p.run() @@ -887,10 +883,10 @@ def upper(s): # However, 'Map' should detect that Create has hinted an int instead. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int) - | 'upper' >> beam.Map(upper)) + | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int) + | 'Upper' >> beam.Map(upper)) - self.assertEqual("Type hint violation for 'upper': " + self.assertEqual("Type hint violation for 'Upper': " "requires but got for s", e.exception.message) @@ -912,12 +908,12 @@ def test_filter_does_not_type_check_using_type_hints_method(self): # incoming. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str) - | 'lower' >> beam.Map(lambda x: x.lower()) + | 'Strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str) + | 'Lower' >> beam.Map(lambda x: x.lower()) .with_input_types(str).with_output_types(str) - | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int)) + | 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int)) - self.assertEqual("Type hint violation for 'below 3': " + self.assertEqual("Type hint violation for 'Below 3': " "requires but got for x", e.exception.message) @@ -925,9 +921,9 @@ def test_filter_type_checks_using_type_hints_method(self): # No error should be raised if this type-checks properly. d = (self.p | beam.Create(['1', '2', '3', '4', '5']).with_output_types(str) - | 'to int' >> beam.Map(lambda x: int(x)) + | 'ToInt' >> beam.Map(lambda x: int(x)) .with_input_types(str).with_output_types(int) - | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int)) + | 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int)) assert_that(d, equal_to([1, 2])) self.p.run() @@ -939,10 +935,10 @@ def more_than_half(a): # Func above was hinted to only take a float, yet an int will be passed. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'ints' >> beam.Create([1, 2, 3, 4]).with_output_types(int) - | 'half' >> beam.Filter(more_than_half)) + | 'Ints' >> beam.Create([1, 2, 3, 4]).with_output_types(int) + | 'Half' >> beam.Filter(more_than_half)) - self.assertEqual("Type hint violation for 'half': " + self.assertEqual("Type hint violation for 'Half': " "requires but got for a", e.exception.message) @@ -954,15 +950,15 @@ def half(b): # Filter should deduce that it returns the same type that it takes. (self.p - | 'str' >> beam.Create(range(5)).with_output_types(int) - | 'half' >> beam.Filter(half) - | 'to bool' >> beam.Map(lambda x: bool(x)) + | 'Str' >> beam.Create(range(5)).with_output_types(int) + | 'Half' >> beam.Filter(half) + | 'ToBool' >> beam.Map(lambda x: bool(x)) .with_input_types(int).with_output_types(bool)) def test_group_by_key_only_output_type_deduction(self): d = (self.p - | 'str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | ('pair' >> beam.Map(lambda x: (x, ord(x))) + | 'Str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | ('Pair' >> beam.Map(lambda x: (x, ord(x))) .with_output_types(typehints.KV[str, str])) | beam.GroupByKeyOnly()) @@ -973,8 +969,8 @@ def test_group_by_key_only_output_type_deduction(self): def test_group_by_key_output_type_deduction(self): d = (self.p - | 'str' >> beam.Create(range(20)).with_output_types(int) - | ('pair negative' >> beam.Map(lambda x: (x % 5, -x)) + | 'Str' >> beam.Create(range(20)).with_output_types(int) + | ('PairNegative' >> beam.Map(lambda x: (x % 5, -x)) .with_output_types(typehints.KV[int, int])) | beam.GroupByKey()) @@ -1016,11 +1012,11 @@ def test_pipeline_checking_pardo_insufficient_type_information(self): # information to the ParDo. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'nums' >> beam.Create(range(5)) - | 'mod dup' >> beam.FlatMap(lambda x: (x % 2, x))) + | 'Nums' >> beam.Create(range(5)) + | 'ModDup' >> beam.FlatMap(lambda x: (x % 2, x))) self.assertEqual('Pipeline type checking is enabled, however no output ' - 'type-hint was found for the PTransform Create(nums)', + 'type-hint was found for the PTransform Create(Nums)', e.exception.message) def test_pipeline_checking_gbk_insufficient_type_information(self): @@ -1029,13 +1025,13 @@ def test_pipeline_checking_gbk_insufficient_type_information(self): # information to GBK-only. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'nums' >> beam.Create(range(5)).with_output_types(int) - | 'mod dup' >> beam.Map(lambda x: (x % 2, x)) + | 'Nums' >> beam.Create(range(5)).with_output_types(int) + | 'ModDup' >> beam.Map(lambda x: (x % 2, x)) | beam.GroupByKeyOnly()) self.assertEqual('Pipeline type checking is enabled, however no output ' 'type-hint was found for the PTransform ' - 'ParDo(mod dup)', + 'ParDo(ModDup)', e.exception.message) def test_disable_pipeline_type_check(self): @@ -1044,8 +1040,8 @@ def test_disable_pipeline_type_check(self): # The pipeline below should raise a TypeError, however pipeline type # checking was disabled above. (self.p - | 't' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'lower' >> beam.Map(lambda x: x.lower()) + | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Lower' >> beam.Map(lambda x: x.lower()) .with_input_types(str).with_output_types(str)) def test_run_time_type_checking_enabled_type_violation(self): @@ -1060,14 +1056,14 @@ def int_to_string(x): # Function above has been type-hinted to only accept an int. But in the # pipeline execution it'll be passed a string due to the output of Create. (self.p - | 't' >> beam.Create(['some_string']) - | 'to str' >> beam.Map(int_to_string)) + | 'T' >> beam.Create(['some_string']) + | 'ToStr' >> beam.Map(int_to_string)) with self.assertRaises(typehints.TypeCheckError) as e: self.p.run() self.assertStartswith( e.exception.message, - "Runtime type violation detected within ParDo(to str): " + "Runtime type violation detected within ParDo(ToStr): " "Type-hint for argument: 'x' violated. " "Expected an instance of , " "instead found some_string, an instance of .") @@ -1084,9 +1080,9 @@ def group_with_upper_ord(x): # Pipeline checking is off, but the above function should satisfy types at # run-time. result = (self.p - | 't' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g']) + | 'T' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g']) .with_output_types(str) - | 'gen keys' >> beam.Map(group_with_upper_ord) + | 'GenKeys' >> beam.Map(group_with_upper_ord) | 'O' >> beam.GroupByKey()) assert_that(result, equal_to([(1, ['g']), @@ -1106,9 +1102,9 @@ def is_even_as_key(a): return (a % 2, a) (self.p - | 'nums' >> beam.Create(range(5)).with_output_types(int) - | 'is even' >> beam.Map(is_even_as_key) - | 'parity' >> beam.GroupByKey()) + | 'Nums' >> beam.Create(range(5)).with_output_types(int) + | 'IsEven' >> beam.Map(is_even_as_key) + | 'Parity' >> beam.GroupByKey()) # Although all the types appear to be correct when checked at pipeline # construction. Runtime type-checking should detect the 'is_even_as_key' is @@ -1118,7 +1114,7 @@ def is_even_as_key(a): self.assertStartswith( e.exception.message, - "Runtime type violation detected within ParDo(is even): " + "Runtime type violation detected within ParDo(IsEven): " "Tuple[bool, int] hint type-constraint violated. " "The type of element #0 in the passed tuple is incorrect. " "Expected an instance of type bool, " @@ -1135,9 +1131,9 @@ def is_even_as_key(a): return (a % 2 == 0, a) result = (self.p - | 'nums' >> beam.Create(range(5)).with_output_types(int) - | 'is even' >> beam.Map(is_even_as_key) - | 'parity' >> beam.GroupByKey()) + | 'Nums' >> beam.Create(range(5)).with_output_types(int) + | 'IsEven' >> beam.Map(is_even_as_key) + | 'Parity' >> beam.GroupByKey()) assert_that(result, equal_to([(False, [1, 3]), (True, [0, 2, 4])])) self.p.run() @@ -1152,13 +1148,13 @@ def test_pipeline_runtime_checking_violation_simple_type_input(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create([1, 2, 3]) - | ('to int' >> beam.FlatMap(lambda x: [int(x)]) + | ('ToInt' >> beam.FlatMap(lambda x: [int(x)]) .with_input_types(str).with_output_types(int))) self.p.run() self.assertStartswith( e.exception.message, - "Runtime type violation detected within ParDo(to int): " + "Runtime type violation detected within ParDo(ToInt): " "Type-hint for argument: 'x' violated. " "Expected an instance of , " "instead found 1, an instance of .") @@ -1170,14 +1166,14 @@ def test_pipeline_runtime_checking_violation_composite_type_input(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)]) - | ('add' >> beam.FlatMap(lambda (x, y): [x + y]) + | ('Add' >> beam.FlatMap(lambda (x, y): [x + y]) .with_input_types(typehints.Tuple[int, int]).with_output_types(int)) ) self.p.run() self.assertStartswith( e.exception.message, - "Runtime type violation detected within ParDo(add): " + "Runtime type violation detected within ParDo(Add): " "Type-hint for argument: 'y' violated. " "Expected an instance of , " "instead found 3.0, an instance of .") @@ -1189,14 +1185,13 @@ def test_pipeline_runtime_checking_violation_simple_type_output(self): # The type-hinted applied via the 'returns()' method indicates the ParDo # should output an instance of type 'int', however a 'float' will be # generated instead. - print "HINTS", beam.FlatMap( - 'to int', + print "HINTS", ('ToInt' >> beam.FlatMap( lambda x: [float(x)]).with_input_types(int).with_output_types( - int).get_type_hints() + int)).get_type_hints() with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create([1, 2, 3]) - | ('to int' >> beam.FlatMap(lambda x: [float(x)]) + | ('ToInt' >> beam.FlatMap(lambda x: [float(x)]) .with_input_types(int).with_output_types(int)) ) self.p.run() @@ -1204,7 +1199,7 @@ def test_pipeline_runtime_checking_violation_simple_type_output(self): self.assertStartswith( e.exception.message, "Runtime type violation detected within " - "ParDo(to int): " + "ParDo(ToInt): " "According to type-hint expected output should be " "of type . Instead, received '1.0', " "an instance of type .") @@ -1219,7 +1214,7 @@ def test_pipeline_runtime_checking_violation_composite_type_output(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)]) - | ('swap' >> beam.FlatMap(lambda (x, y): [x + y]) + | ('Swap' >> beam.FlatMap(lambda (x, y): [x + y]) .with_input_types(typehints.Tuple[int, float]) .with_output_types(typehints.Tuple[float, int])) ) @@ -1228,7 +1223,7 @@ def test_pipeline_runtime_checking_violation_composite_type_output(self): self.assertStartswith( e.exception.message, "Runtime type violation detected within " - "ParDo(swap): Tuple type constraint violated. " + "ParDo(Swap): Tuple type constraint violated. " "Valid object instance must be of type 'tuple'. Instead, " "an instance of 'float' was received.") @@ -1242,12 +1237,12 @@ def add(a, b): return a + b with self.assertRaises(typehints.TypeCheckError) as e: - (self.p | beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0)) + (self.p | beam.Create([1, 2, 3, 4]) | 'Add 1' >> beam.Map(add, 1.0)) self.p.run() self.assertStartswith( e.exception.message, - "Runtime type violation detected within ParDo(add 1): " + "Runtime type violation detected within ParDo(Add 1): " "Type-hint for argument: 'b' violated. " "Expected an instance of , " "instead found 1.0, an instance of .") @@ -1259,14 +1254,14 @@ def test_pipline_runtime_checking_violation_with_side_inputs_via_method(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create([1, 2, 3, 4]) - | ('add 1' >> beam.Map(lambda x, one: x + one, 1.0) + | ('Add 1' >> beam.Map(lambda x, one: x + one, 1.0) .with_input_types(int, int) .with_output_types(float))) self.p.run() self.assertStartswith( e.exception.message, - "Runtime type violation detected within ParDo(add 1): " + "Runtime type violation detected within ParDo(Add 1): " "Type-hint for argument: 'one' violated. " "Expected an instance of , " "instead found 1.0, an instance of .") @@ -1278,8 +1273,8 @@ def sum_ints(ints): return sum(ints) d = (self.p - | 't' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'sum' >> beam.CombineGlobally(sum_ints)) + | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Sum' >> beam.CombineGlobally(sum_ints)) self.assertEqual(int, d.element_type) assert_that(d, equal_to([6])) @@ -1293,8 +1288,8 @@ def bad_combine(a): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'm' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'add' >> beam.CombineGlobally(bad_combine)) + | 'M' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Add' >> beam.CombineGlobally(bad_combine)) self.assertEqual( "All functions for a Combine PTransform must accept a " @@ -1314,9 +1309,9 @@ def range_from_zero(n): return list(range(n+1)) d = (self.p - | 't' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'sum' >> beam.CombineGlobally(sum_ints) - | 'range' >> beam.ParDo(range_from_zero)) + | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Sum' >> beam.CombineGlobally(sum_ints) + | 'Range' >> beam.ParDo(range_from_zero)) self.assertEqual(int, d.element_type) assert_that(d, equal_to([0, 1, 2, 3, 4, 5, 6])) @@ -1331,8 +1326,8 @@ def iter_mul(ints): return reduce(operator.mul, ints, 1) d = (self.p - | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int) - | 'mul' >> beam.CombineGlobally(iter_mul)) + | 'K' >> beam.Create([5, 5, 5, 5]).with_output_types(int) + | 'Mul' >> beam.CombineGlobally(iter_mul)) assert_that(d, equal_to([625])) self.p.run() @@ -1349,14 +1344,14 @@ def iter_mul(ints): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int) - | 'mul' >> beam.CombineGlobally(iter_mul)) + | 'K' >> beam.Create([5, 5, 5, 5]).with_output_types(int) + | 'Mul' >> beam.CombineGlobally(iter_mul)) self.p.run() self.assertStartswith( e.exception.message, "Runtime type violation detected within " - "ParDo(mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): " + "ParDo(Mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): " "Tuple[TypeVariable[K], int] hint type-constraint violated. " "The type of element #1 in the passed tuple is incorrect. " "Expected an instance of type int, " @@ -1381,7 +1376,7 @@ def test_combine_runtime_type_check_using_methods(self): d = (self.p | beam.Create(range(5)).with_output_types(int) - | ('sum' >> beam.CombineGlobally(lambda s: sum(s)) + | ('Sum' >> beam.CombineGlobally(lambda s: sum(s)) .with_input_types(int).with_output_types(int))) assert_that(d, equal_to([10])) @@ -1391,10 +1386,10 @@ def test_combine_pipeline_type_check_violation_using_methods(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create(range(3)).with_output_types(int) - | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) + | ('SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) .with_input_types(str).with_output_types(str))) - self.assertEqual("Input type hint violation at sort join: " + self.assertEqual("Input type hint violation at SortJoin: " "expected , got ", e.exception.message) @@ -1405,14 +1400,14 @@ def test_combine_runtime_type_check_violation_using_methods(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create(range(3)).with_output_types(int) - | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) + | ('SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) .with_input_types(str).with_output_types(str))) self.p.run() self.assertStartswith( e.exception.message, "Runtime type violation detected within " - "ParDo(sort join/KeyWithVoid): " + "ParDo(SortJoin/KeyWithVoid): " "Type-hint for argument: 'v' violated. " "Expected an instance of , " "instead found 0, an instance of .") @@ -1422,20 +1417,20 @@ def test_combine_insufficient_type_hint_information(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'e' >> beam.Create(range(3)).with_output_types(int) - | 'sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) - | 'f' >> beam.Map(lambda x: x + 1)) + | 'E' >> beam.Create(range(3)).with_output_types(int) + | 'SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) + | 'F' >> beam.Map(lambda x: x + 1)) self.assertEqual( 'Pipeline type checking is enabled, ' 'however no output type-hint was found for the PTransform ' - 'ParDo(sort join/CombinePerKey/Combine/ParDo(CombineValuesDoFn))', + 'ParDo(SortJoin/CombinePerKey/Combine/ParDo(CombineValuesDoFn))', e.exception.message) def test_mean_globally_pipeline_checking_satisfied(self): d = (self.p - | 'c' >> beam.Create(range(5)).with_output_types(int) - | 'mean' >> combine.Mean.Globally()) + | 'C' >> beam.Create(range(5)).with_output_types(int) + | 'Mean' >> combine.Mean.Globally()) self.assertTrue(d.element_type is float) assert_that(d, equal_to([2.0])) @@ -1444,8 +1439,8 @@ def test_mean_globally_pipeline_checking_satisfied(self): def test_mean_globally_pipeline_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'c' >> beam.Create(['test']).with_output_types(str) - | 'mean' >> combine.Mean.Globally()) + | 'C' >> beam.Create(['test']).with_output_types(str) + | 'Mean' >> combine.Mean.Globally()) self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': " "requires Tuple[TypeVariable[K], " @@ -1457,8 +1452,8 @@ def test_mean_globally_runtime_checking_satisfied(self): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 'c' >> beam.Create(range(5)).with_output_types(int) - | 'mean' >> combine.Mean.Globally()) + | 'C' >> beam.Create(range(5)).with_output_types(int) + | 'Mean' >> combine.Mean.Globally()) self.assertTrue(d.element_type is float) assert_that(d, equal_to([2.0])) @@ -1470,8 +1465,8 @@ def test_mean_globally_runtime_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | 'mean' >> combine.Mean.Globally()) + | 'C' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | 'Mean' >> combine.Mean.Globally()) self.p.run() self.assertEqual("Runtime type violation detected for transform input " "when executing ParDoFlatMap(Combine): Tuple[Any, " @@ -1487,9 +1482,9 @@ def test_mean_globally_runtime_checking_violated(self): def test_mean_per_key_pipeline_checking_satisfied(self): d = (self.p | beam.Create(range(5)).with_output_types(int) - | ('even group' >> beam.Map(lambda x: (not x % 2, x)) + | ('EvenGroup' >> beam.Map(lambda x: (not x % 2, x)) .with_output_types(typehints.KV[bool, int])) - | 'even mean' >> combine.Mean.PerKey()) + | 'EvenMean' >> combine.Mean.PerKey()) self.assertCompatible(typehints.KV[bool, float], d.element_type) assert_that(d, equal_to([(False, 2.0), (True, 2.0)])) @@ -1499,9 +1494,9 @@ def test_mean_per_key_pipeline_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create(map(str, range(5))).with_output_types(str) - | ('upper pair' >> beam.Map(lambda x: (x.upper(), x)) + | ('UpperPair' >> beam.Map(lambda x: (x.upper(), x)) .with_output_types(typehints.KV[str, str])) - | 'even mean' >> combine.Mean.PerKey()) + | 'EvenMean' >> combine.Mean.PerKey()) self.p.run() self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': " @@ -1515,9 +1510,9 @@ def test_mean_per_key_runtime_checking_satisfied(self): d = (self.p | beam.Create(range(5)).with_output_types(int) - | ('odd group' >> beam.Map(lambda x: (bool(x % 2), x)) + | ('OddGroup' >> beam.Map(lambda x: (bool(x % 2), x)) .with_output_types(typehints.KV[bool, int])) - | 'odd mean' >> combine.Mean.PerKey()) + | 'OddMean' >> combine.Mean.PerKey()) self.assertCompatible(typehints.KV[bool, float], d.element_type) assert_that(d, equal_to([(False, 2.0), (True, 2.0)])) @@ -1530,15 +1525,15 @@ def test_mean_per_key_runtime_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create(range(5)).with_output_types(int) - | ('odd group' >> beam.Map(lambda x: (x, str(bool(x % 2)))) + | ('OddGroup' >> beam.Map(lambda x: (x, str(bool(x % 2)))) .with_output_types(typehints.KV[int, str])) - | 'odd mean' >> combine.Mean.PerKey()) + | 'OddMean' >> combine.Mean.PerKey()) self.p.run() self.assertStartswith( e.exception.message, "Runtime type violation detected within " - "ParDo(odd mean/CombinePerKey(MeanCombineFn)/" + "ParDo(OddMean/CombinePerKey(MeanCombineFn)/" "Combine/ParDo(CombineValuesDoFn)): " "Type-hint for argument: 'p_context' violated: " "Tuple[TypeVariable[K], Iterable[Union[float, int, long]]]" @@ -1553,8 +1548,8 @@ def test_mean_per_key_runtime_checking_violated(self): def test_count_globally_pipeline_type_checking_satisfied(self): d = (self.p - | 'p' >> beam.Create(range(5)).with_output_types(int) - | 'count int' >> combine.Count.Globally()) + | 'P' >> beam.Create(range(5)).with_output_types(int) + | 'CountInt' >> combine.Count.Globally()) self.assertTrue(d.element_type is int) assert_that(d, equal_to([5])) @@ -1564,8 +1559,8 @@ def test_count_globally_runtime_type_checking_satisfied(self): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 'p' >> beam.Create(range(5)).with_output_types(int) - | 'count int' >> combine.Count.Globally()) + | 'P' >> beam.Create(range(5)).with_output_types(int) + | 'CountInt' >> combine.Count.Globally()) self.assertTrue(d.element_type is int) assert_that(d, equal_to([5])) @@ -1574,9 +1569,9 @@ def test_count_globally_runtime_type_checking_satisfied(self): def test_count_perkey_pipeline_type_checking_satisfied(self): d = (self.p | beam.Create(range(5)).with_output_types(int) - | ('even group' >> beam.Map(lambda x: (not x % 2, x)) + | ('EvenGroup' >> beam.Map(lambda x: (not x % 2, x)) .with_output_types(typehints.KV[bool, int])) - | 'count int' >> combine.Count.PerKey()) + | 'CountInt' >> combine.Count.PerKey()) self.assertCompatible(typehints.KV[bool, int], d.element_type) assert_that(d, equal_to([(False, 2), (True, 3)])) @@ -1586,7 +1581,7 @@ def test_count_perkey_pipeline_type_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create(range(5)).with_output_types(int) - | 'count int' >> combine.Count.PerKey()) + | 'CountInt' >> combine.Count.PerKey()) self.assertEqual("Input type hint violation at GroupByKey: " "expected Tuple[TypeVariable[K], TypeVariable[V]], " @@ -1598,9 +1593,9 @@ def test_count_perkey_runtime_type_checking_satisfied(self): d = (self.p | beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | 'dup key' >> beam.Map(lambda x: (x, x)) + | 'DupKey' >> beam.Map(lambda x: (x, x)) .with_output_types(typehints.KV[str, str]) - | 'count dups' >> combine.Count.PerKey()) + | 'CountDups' >> combine.Count.PerKey()) self.assertCompatible(typehints.KV[str, int], d.element_type) assert_that(d, equal_to([('e', 1), ('s', 1), ('t', 2)])) @@ -1609,7 +1604,7 @@ def test_count_perkey_runtime_type_checking_satisfied(self): def test_count_perelement_pipeline_type_checking_satisfied(self): d = (self.p | beam.Create([1, 1, 2, 3]).with_output_types(int) - | 'count elems' >> combine.Count.PerElement()) + | 'CountElems' >> combine.Count.PerElement()) self.assertCompatible(typehints.KV[int, int], d.element_type) assert_that(d, equal_to([(1, 2), (2, 1), (3, 1)])) @@ -1621,7 +1616,7 @@ def test_count_perelement_pipeline_type_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | 'f' >> beam.Create([1, 1, 2, 3]) - | 'count elems' >> combine.Count.PerElement()) + | 'CountElems' >> combine.Count.PerElement()) self.assertEqual('Pipeline type checking is enabled, however no output ' 'type-hint was found for the PTransform ' @@ -1634,7 +1629,7 @@ def test_count_perelement_runtime_type_checking_satisfied(self): d = (self.p | beam.Create([True, True, False, True, True]) .with_output_types(bool) - | 'count elems' >> combine.Count.PerElement()) + | 'CountElems' >> combine.Count.PerElement()) self.assertCompatible(typehints.KV[bool, int], d.element_type) assert_that(d, equal_to([(False, 1), (True, 4)])) @@ -1643,7 +1638,7 @@ def test_count_perelement_runtime_type_checking_satisfied(self): def test_top_of_pipeline_checking_satisfied(self): d = (self.p | beam.Create(range(5, 11)).with_output_types(int) - | 'top 3' >> combine.Top.Of(3, lambda x, y: x < y)) + | 'Top 3' >> combine.Top.Of(3, lambda x, y: x < y)) self.assertCompatible(typehints.Iterable[int], d.element_type) @@ -1655,7 +1650,7 @@ def test_top_of_runtime_checking_satisfied(self): d = (self.p | beam.Create(list('testing')).with_output_types(str) - | 'acii top' >> combine.Top.Of(3, lambda x, y: x < y)) + | 'AciiTop' >> combine.Top.Of(3, lambda x, y: x < y)) self.assertCompatible(typehints.Iterable[str], d.element_type) assert_that(d, equal_to([['t', 't', 's']])) @@ -1665,8 +1660,8 @@ def test_per_key_pipeline_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create(range(100)).with_output_types(int) - | 'num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int) - | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b)) + | 'Num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int) + | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b)) self.assertEqual("Input type hint violation at GroupByKey: " "expected Tuple[TypeVariable[K], TypeVariable[V]], " @@ -1676,9 +1671,9 @@ def test_per_key_pipeline_checking_violated(self): def test_per_key_pipeline_checking_satisfied(self): d = (self.p | beam.Create(range(100)).with_output_types(int) - | ('group mod 3' >> beam.Map(lambda x: (x % 3, x)) + | ('GroupMod 3' >> beam.Map(lambda x: (x % 3, x)) .with_output_types(typehints.KV[int, int])) - | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b)) + | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b)) self.assertCompatible(typehints.Tuple[int, typehints.Iterable[int]], d.element_type) @@ -1690,9 +1685,9 @@ def test_per_key_runtime_checking_satisfied(self): d = (self.p | beam.Create(range(21)) - | ('group mod 3' >> beam.Map(lambda x: (x % 3, x)) + | ('GroupMod 3' >> beam.Map(lambda x: (x % 3, x)) .with_output_types(typehints.KV[int, int])) - | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b)) + | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b)) self.assertCompatible(typehints.KV[int, typehints.Iterable[int]], d.element_type) @@ -1702,7 +1697,7 @@ def test_per_key_runtime_checking_satisfied(self): def test_sample_globally_pipeline_satisfied(self): d = (self.p | beam.Create([2, 2, 3, 3]).with_output_types(int) - | 'sample' >> combine.Sample.FixedSizeGlobally(3)) + | 'Sample' >> combine.Sample.FixedSizeGlobally(3)) self.assertCompatible(typehints.Iterable[int], d.element_type) @@ -1718,7 +1713,7 @@ def test_sample_globally_runtime_satisfied(self): d = (self.p | beam.Create([2, 2, 3, 3]).with_output_types(int) - | 'sample' >> combine.Sample.FixedSizeGlobally(2)) + | 'Sample' >> combine.Sample.FixedSizeGlobally(2)) self.assertCompatible(typehints.Iterable[int], d.element_type) @@ -1733,7 +1728,7 @@ def test_sample_per_key_pipeline_satisfied(self): d = (self.p | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)]) .with_output_types(typehints.KV[int, int])) - | 'sample' >> combine.Sample.FixedSizePerKey(2)) + | 'Sample' >> combine.Sample.FixedSizePerKey(2)) self.assertCompatible(typehints.KV[int, typehints.Iterable[int]], d.element_type) @@ -1752,7 +1747,7 @@ def test_sample_per_key_runtime_satisfied(self): d = (self.p | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)]) .with_output_types(typehints.KV[int, int])) - | 'sample' >> combine.Sample.FixedSizePerKey(1)) + | 'Sample' >> combine.Sample.FixedSizePerKey(1)) self.assertCompatible(typehints.KV[int, typehints.Iterable[int]], d.element_type) @@ -1835,13 +1830,13 @@ def test_runtime_type_check_python_type_error(self): with self.assertRaises(TypeError) as e: (self.p | beam.Create([1, 2, 3]).with_output_types(int) - | 'len' >> beam.Map(lambda x: len(x)).with_output_types(int)) + | 'Len' >> beam.Map(lambda x: len(x)).with_output_types(int)) self.p.run() # Our special type-checking related TypeError shouldn't have been raised. # Instead the above pipeline should have triggered a regular Python runtime # TypeError. - self.assertEqual("object of type 'int' has no len() [while running 'len']", + self.assertEqual("object of type 'int' has no len() [while running 'Len']", e.exception.message) self.assertFalse(isinstance(e, typehints.TypeCheckError)) @@ -1869,7 +1864,7 @@ def test_inferred_bad_kv_type(self): with self.assertRaises(typehints.TypeCheckError) as e: _ = (self.p | beam.Create(['a', 'b', 'c']) - | 'ungroupable' >> beam.Map(lambda x: (x, 0, 1.0)) + | 'Ungroupable' >> beam.Map(lambda x: (x, 0, 1.0)) | beam.GroupByKey()) self.assertEqual('Input type hint violation at GroupByKey: ' @@ -1879,11 +1874,11 @@ def test_inferred_bad_kv_type(self): def test_type_inference_command_line_flag_toggle(self): self.p.options.view_as(TypeOptions).pipeline_type_check = False - x = self.p | 'c1' >> beam.Create([1, 2, 3, 4]) + x = self.p | 'C1' >> beam.Create([1, 2, 3, 4]) self.assertIsNone(x.element_type) self.p.options.view_as(TypeOptions).pipeline_type_check = True - x = self.p | 'c2' >> beam.Create([1, 2, 3, 4]) + x = self.p | 'C2' >> beam.Create([1, 2, 3, 4]) self.assertEqual(int, x.element_type)