From b6a0974375e5c7414a52cd3ffd2a9e9fe8d1889f Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Thu, 15 Dec 2016 14:27:08 -0800 Subject: [PATCH] Rename PTransform.apply() to PTransform.expand() --- sdks/python/README.md | 2 +- .../examples/complete/autocomplete.py | 2 +- .../examples/complete/estimate_pi.py | 2 +- .../apache_beam/examples/complete/tfidf.py | 2 +- .../complete/top_wikipedia_sessions.py | 6 ++--- .../examples/cookbook/custom_ptransform.py | 2 +- .../cookbook/multiple_output_pardo.py | 2 +- .../apache_beam/examples/snippets/snippets.py | 16 ++++++------- .../examples/snippets/snippets_test.py | 2 +- .../examples/wordcount_debugging.py | 2 +- sdks/python/apache_beam/io/avroio.py | 4 ++-- .../io/datastore/v1/datastoreio.py | 4 ++-- sdks/python/apache_beam/io/iobase.py | 6 ++--- sdks/python/apache_beam/io/textio.py | 4 ++-- sdks/python/apache_beam/pipeline_test.py | 4 ++-- .../runners/dataflow/native_io/iobase.py | 2 +- .../runners/direct/direct_runner.py | 2 +- sdks/python/apache_beam/runners/runner.py | 4 ++-- .../apache_beam/transforms/combiners.py | 14 +++++------ .../apache_beam/transforms/combiners_test.py | 2 +- sdks/python/apache_beam/transforms/core.py | 24 +++++++++---------- .../apache_beam/transforms/ptransform.py | 10 ++++---- .../apache_beam/transforms/ptransform_test.py | 6 ++--- .../apache_beam/transforms/sideinputs.py | 10 ++++---- sdks/python/apache_beam/transforms/util.py | 4 ++-- .../transforms/write_ptransform_test.py | 2 +- .../typehints/typed_pipeline_test.py | 2 +- 27 files changed, 71 insertions(+), 71 deletions(-) diff --git a/sdks/python/README.md b/sdks/python/README.md index 820084d7f90e5..5ea2a60a51748 100644 --- a/sdks/python/README.md +++ b/sdks/python/README.md @@ -262,7 +262,7 @@ import re import apache_beam as beam p = beam.Pipeline('DirectPipelineRunner') class MyCountTransform(beam.PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | 'one word' >> beam.Map(lambda word: (word, 1)) # GroupByKey accepts a PCollection of (word, 1) elements and diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py index c3cd88f7d28e7..eaa5ca2fc5b2f 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete.py @@ -60,7 +60,7 @@ def __init__(self, count): super(TopPerPrefix, self).__init__() self._count = count - def apply(self, words): + def expand(self, words): """Compute the most common words for each possible prefixes. Args: diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index 37c1aadd5308b..682c6d20a9268 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -90,7 +90,7 @@ def encode(self, x): class EstimatePiTransform(beam.PTransform): """Runs 10M trials, and combine the results to estimate pi.""" - def apply(self, pcoll): + def expand(self, pcoll): # A hundred work items of a hundred thousand tries each. return (pcoll | 'Initialize' >> beam.Create([100000] * 100).with_output_types(int) diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index 043d5f64931ee..59b29006d5942 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -53,7 +53,7 @@ class TfIdf(beam.PTransform): The output is mapping from terms to scores for each document URI. """ - def apply(self, uri_to_content): + def expand(self, uri_to_content): # Compute the total number of documents, and prepare a singleton # PCollection to use as side input. diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index a48a3835fcbfd..2d66d7ff91d0f 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -76,7 +76,7 @@ class ComputeSessions(beam.PTransform): def __init__(self): super(ComputeSessions, self).__init__() - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | beam.WindowInto('ComputeSessionsWindow', window.Sessions(gap_size=ONE_HOUR_IN_SECONDS)) @@ -89,7 +89,7 @@ class TopPerMonth(beam.PTransform): def __init__(self): super(TopPerMonth, self).__init__() - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | beam.WindowInto('TopPerMonthWindow', window.FixedWindows( @@ -127,7 +127,7 @@ def __init__(self, sampling_threshold): super(ComputeTopSessions, self).__init__() self.sampling_threshold = sampling_threshold - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | beam.ParDo('ExtractUserAndTimestamp', ExtractUserAndTimestampDoFn()) diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py index ca13bbfeef2bb..b9d64cf846b7b 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py @@ -36,7 +36,7 @@ class Count1(beam.PTransform): """Count as a subclass of PTransform, with an apply method.""" - def apply(self, pcoll): + def expand(self, pcoll): return ( pcoll | 'ParWithOne' >> beam.Map(lambda v: (v, 1)) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py index d24170e73bab7..167e709eb78b0 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py @@ -112,7 +112,7 @@ class CountWords(beam.PTransform): of "word: count" strings. """ - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 6dcf05e75f53d..f78ecd84989cd 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -80,7 +80,7 @@ def construct_pipeline(renames): class ReverseWords(beam.PTransform): """A PTransform that reverses individual elements in a PCollection.""" - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | beam.Map(lambda e: e[::-1]) def filter_words(unused_x): @@ -387,7 +387,7 @@ def process(self, context): # The CountWords Composite Transform inside the WordCount pipeline. class CountWords(beam.PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll # Convert lines of text into individual words. | 'ExtractWords' >> beam.ParDo(ExtractWordsFn()) @@ -508,7 +508,7 @@ def _add_argparse_args(cls, parser): # [START examples_wordcount_wordcount_composite] class CountWords(beam.PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll # Convert lines of text into individual words. | beam.FlatMap( @@ -705,7 +705,7 @@ def __init__(self, count, **kwargs): super(ReadFromCountingSource, self).__init__(**kwargs) self._count = count - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | iobase.Read(_CountingSource(count)) # [END model_custom_source_new_ptransform] @@ -838,7 +838,7 @@ def __init__(self, label, url, final_table_name, **kwargs): self._url = url self._final_table_name = final_table_name - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | iobase.Write(_SimpleKVSink(self._url, self._final_table_name)) # [END model_custom_sink_new_ptransform] @@ -1001,7 +1001,7 @@ def model_composite_transform_example(contents, output_path): class CountWords(beam.PTransform): # [END composite_ptransform_declare] - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | beam.FlatMap(lambda x: re.findall(r'\w+', x)) | beam.combiners.Count.PerElement() @@ -1197,7 +1197,7 @@ def join_info(name, emails, phone_numbers): # [START model_library_transforms_keys] class Keys(beam.PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | 'Keys' >> beam.Map(lambda (k, v): k) # [END model_library_transforms_keys] # pylint: enable=invalid-name @@ -1206,7 +1206,7 @@ def apply(self, pcoll): # [START model_library_transforms_count] class Count(beam.PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return ( pcoll | 'PairWithOne' >> beam.Map(lambda v: (v, 1)) diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 09b4ba4caa1ac..db2ea81827920 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -279,7 +279,7 @@ def process(self, context): @beam.typehints.with_input_types(T) @beam.typehints.with_output_types(beam.typehints.Tuple[int, T]) class MyTransform(beam.PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | beam.Map(lambda x: (len(x), x)) words_with_lens = words | MyTransform() diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index 473a4864a483d..cdf4e0cc4c145 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -95,7 +95,7 @@ class CountWords(beam.PTransform): def __init__(self): super(CountWords, self).__init__() - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) .with_output_types(unicode)) diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 35d0e9436a070..3663bdbf25720 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -77,7 +77,7 @@ def __init__(self, file_pattern=None, min_bundle_size=0, validate=True): super(ReadFromAvro, self).__init__() self._source = _AvroSource(file_pattern, min_bundle_size, validate=validate) - def apply(self, pvalue): + def expand(self, pvalue): return pvalue.pipeline | Read(self._source) def display_data(self): @@ -294,7 +294,7 @@ def __init__(self, self._sink = _AvroSink(file_path_prefix, schema, codec, file_name_suffix, num_shards, shard_name_template, mime_type) - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | beam.io.iobase.Write(self._sink) def display_data(self): diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py index a86bb0b292f57..93c592d66fe20 100644 --- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py @@ -102,7 +102,7 @@ def __init__(self, project, query, namespace=None, num_splits=0): self._query = query self._num_splits = num_splits - def apply(self, pcoll): + def expand(self, pcoll): # This is a composite transform involves the following: # 1. Create a singleton of the user provided `query` and apply a ``ParDo`` # that splits the query into `num_splits` and assign each split query a @@ -312,7 +312,7 @@ def __init__(self, project, mutation_fn): self._mutation_fn = mutation_fn logging.warning('datastoreio write transform is experimental.') - def apply(self, pcoll): + def expand(self, pcoll): return (pcoll | 'Convert to Mutation' >> Map(self._mutation_fn) | 'Write Mutation to Datastore' >> ParDo(_Mutate.DatastoreWriteFn( diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index fd6ae575e5aed..8fb52383c0173 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -658,7 +658,7 @@ def __init__(self, *args, **kwargs): super(Read, self).__init__(label) self.source = source - def apply(self, pbegin): + def expand(self, pbegin): assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline return pvalue.PCollection(self.pipeline) @@ -723,7 +723,7 @@ def display_data(self): return {'sink': self.sink.__class__, 'sink_dd': self.sink} - def apply(self, pcoll): + def expand(self, pcoll): from apache_beam.runners.dataflow.native_io import iobase as dataflow_io if isinstance(self.sink, dataflow_io.NativeSink): # A native sink @@ -746,7 +746,7 @@ def __init__(self, sink): super(WriteImpl, self).__init__() self.sink = sink - def apply(self, pcoll): + def expand(self, pcoll): do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None]) init_result_coll = do_once | core.Map( 'initialize_write', lambda _, sink: sink.initialize_write(), self.sink) diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index ebadf693419e1..09cf02427efaf 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -239,7 +239,7 @@ def __init__( strip_trailing_newlines, coder, validate=validate) - def apply(self, pvalue): + def expand(self, pvalue): return pvalue.pipeline | Read(self._source) def display_data(self): @@ -297,7 +297,7 @@ def __init__( append_trailing_newlines, num_shards, shard_name_template, coder, compression_type) - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | Write(self._sink) def display_data(self): diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index c50f04d95337c..5af4811dfc230 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -73,7 +73,7 @@ def custom_callable(pcoll): class CustomTransform(PTransform): - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | '+1' >> FlatMap(lambda x: [x + 1]) class Visitor(PipelineVisitor): @@ -174,7 +174,7 @@ def __init__(self, suffix): # No call to super(...).__init__ self.suffix = suffix - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | Map(lambda x: x + self.suffix) self.assertEqual( diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py index 32da3a2855951..b6eb2888c57d2 100644 --- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py +++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py @@ -306,6 +306,6 @@ def __init__(self, *args, **kwargs): super(_NativeWrite, self).__init__(label) self.sink = sink - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) return pvalue.PDone(pcoll.pipeline) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 1afd486965c50..fa78902db995f 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -92,7 +92,7 @@ def cache(self): def apply(self, transform, input): # pylint: disable=redefined-builtin """Runner callback for a pipeline.apply call.""" - return transform.apply(input) + return transform.expand(input) class BufferingInMemoryCache(object): diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 0f53d65787415..ec15beed8d66a 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -148,8 +148,8 @@ def apply(self, transform, input): 'Execution of [%s] not implemented in runner %s.' % (transform, self)) def apply_PTransform(self, transform, input): - # The base case of apply is to call the transform's apply. - return transform.apply(input) + # The base case of apply is to call the transform's expand. + return transform.expand(input) def run_transform(self, transform_node): """Runner callback for a pipeline.run call. diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 22d2b3e67bedd..96fcdddb76ee5 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -58,13 +58,13 @@ class Mean(object): class Globally(ptransform.PTransform): """combiners.Mean.Globally computes the arithmetic mean of the elements.""" - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | core.CombineGlobally(MeanCombineFn()) class PerKey(ptransform.PTransform): """combiners.Mean.PerKey finds the means of the values for each key.""" - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | core.CombinePerKey(MeanCombineFn()) @@ -105,19 +105,19 @@ class Count(object): class Globally(ptransform.PTransform): """combiners.Count.Globally counts the total number of elements.""" - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | core.CombineGlobally(CountCombineFn()) class PerKey(ptransform.PTransform): """combiners.Count.PerKey counts how many elements each unique key has.""" - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | core.CombinePerKey(CountCombineFn()) class PerElement(ptransform.PTransform): """combiners.Count.PerElement counts how many times each element occurs.""" - def apply(self, pcoll): + def expand(self, pcoll): paired_with_void_type = KV[pcoll.element_type, Any] return (pcoll | (core.Map('%s:PairWithVoid' % self.label, lambda x: (x, None)) @@ -475,7 +475,7 @@ class ToList(ptransform.PTransform): def __init__(self, label='ToList'): super(ToList, self).__init__(label) - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | core.CombineGlobally(self.label, ToListCombineFn()) @@ -509,7 +509,7 @@ class ToDict(ptransform.PTransform): def __init__(self, label='ToDict'): super(ToDict, self).__init__(label) - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | core.CombineGlobally(self.label, ToDictCombineFn()) diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 8dc274ecd4b60..6113ea21314d8 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -318,7 +318,7 @@ def test_combine_globally_without_default(self): def test_combine_globally_with_default_side_input(self): class CombineWithSideInput(PTransform): - def apply(self, pcoll): + def expand(self, pcoll): side = pcoll | CombineGlobally(sum).as_singleton_view() main = pcoll.pipeline | Create([None]) return main | Map(lambda _, s: s, side) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 523c5a6ab323e..0ba1c62c8fbdf 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -598,7 +598,7 @@ def display_data(self): label='Transform Function'), 'fn_dd': self.fn} - def apply(self, pcoll): + def expand(self, pcoll): self.side_output_tags = set() # TODO(robertwb): Change all uses of the dofn attribute to use fn instead. self.dofn = self.fn @@ -641,7 +641,7 @@ def __init__(self, do_transform, tags, main_tag): self._tags = tags self._main_tag = main_tag - def apply(self, pcoll): + def expand(self, pcoll): _ = pcoll | self._do_transform return pvalue.DoOutputsTuple( pcoll.pipeline, self._do_transform, self._tags, self._main_tag) @@ -854,7 +854,7 @@ def without_defaults(self): def as_singleton_view(self): return self.clone(as_view=True) - def apply(self, pcoll): + def expand(self, pcoll): def add_input_types(transform): type_hints = self.get_type_hints() if type_hints.input_types: @@ -939,7 +939,7 @@ def default_label(self): def process_argspec_fn(self): return self.fn._fn # pylint: disable=protected-access - def apply(self, pcoll): + def expand(self, pcoll): args, kwargs = util.insert_values_in_args( self.args, self.kwargs, self.side_inputs) return pcoll | GroupByKey() | CombineValues('Combine', @@ -952,7 +952,7 @@ class CombineValues(PTransformWithSideInputs): def make_fn(self, fn): return fn if isinstance(fn, CombineFn) else CombineFn.from_callable(fn) - def apply(self, pcoll): + def expand(self, pcoll): args, kwargs = util.insert_values_in_args( self.args, self.kwargs, self.side_inputs) @@ -1083,7 +1083,7 @@ def process(self, context): timer_window, name, time_domain, fire_time, state): yield wvalue.with_value((k, wvalue.value)) - def apply(self, pcoll): + def expand(self, pcoll): # This code path is only used in the local direct runner. For Dataflow # runner execution, the GroupByKey transform is expanded on the service. input_type = pcoll.element_type @@ -1132,7 +1132,7 @@ def infer_output_type(self, input_type): key_type, value_type = trivial_inference.key_value_types(input_type) return KV[key_type, Iterable[value_type]] - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) return pvalue.PCollection(pcoll.pipeline) @@ -1170,7 +1170,7 @@ def process(self, context, partitionfn, n, *args, **kwargs): def make_fn(self, fn): return fn if isinstance(fn, PartitionFn) else CallableWrapperPartitionFn(fn) - def apply(self, pcoll): + def expand(self, pcoll): n = int(self.args[0]) return pcoll | ParDo( self.ApplyPartitionFnFn(), self.fn, *self.args, @@ -1261,14 +1261,14 @@ def get_windowing(self, unused_inputs): def infer_output_type(self, input_type): return input_type - def apply(self, pcoll): + def expand(self, pcoll): input_type = pcoll.element_type if input_type is not None: output_type = input_type self.with_input_types(input_type) self.with_output_types(output_type) - return super(WindowInto, self).apply(pcoll) + return super(WindowInto, self).expand(pcoll) # Python's pickling is broken for nested classes. @@ -1305,7 +1305,7 @@ def _extract_input_pvalues(self, pvalueish): raise ValueError('Input to Flatten must be an iterable.') return pvalueish, pvalueish - def apply(self, pcolls): + def expand(self, pcolls): for pcoll in pcolls: self._check_pcollection(pcoll) return pvalue.PCollection(self.pipeline) @@ -1345,7 +1345,7 @@ def infer_output_type(self, unused_input_type): else: return Union[[trivial_inference.instance_to_type(v) for v in self.value]] - def apply(self, pbegin): + def expand(self, pbegin): assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline return pvalue.PCollection(self.pipeline) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 2212d008d476f..1bd7fb46b2c96 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -359,7 +359,7 @@ def clone(self, new_label): transform.label = new_label return transform - def apply(self, input_or_inputs): + def expand(self, input_or_inputs): raise NotImplementedError def __str__(self): @@ -493,7 +493,7 @@ def __or__(self, right): else: return NotImplemented - def apply(self, pval): + def expand(self, pval): return reduce(operator.or_, self._parts, pval) @@ -650,7 +650,7 @@ def __call__(self, *args, **kwargs): super(CallablePTransform, self).__init__(label=label) return self - def apply(self, pcoll): + def expand(self, pcoll): # Since the PTransform will be implemented entirely as a function # (once called), we need to pass through any type-hinting information that # may have been annotated via the .with_input_types() and @@ -700,7 +700,7 @@ def __init__(self, mapfn): super(CustomMapper, self).__init__() self.mapfn = mapfn - def apply(self, pcoll): + def expand(self, pcoll): return pcoll | ParDo(self.mapfn) With either method the custom PTransform can be used in pipelines as if @@ -738,5 +738,5 @@ def __init__(self, transform, label): def __ror__(self, pvalueish): return self.transform.__ror__(pvalueish, self.label) - def apply(self, pvalue): + def expand(self, pvalue): raise RuntimeError("Should never be applied directly.") diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index e3b102601537b..9118feef6b552 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -526,7 +526,7 @@ def test_apply_to_list(self): def test_multi_input_ptransform(self): class DisjointUnion(PTransform): - def apply(self, pcollections): + def expand(self, pcollections): return (pcollections | beam.Flatten() | beam.Map(lambda x: (x, None)) @@ -545,7 +545,7 @@ def _extract_input_pvalues(self, pvalueish): pvalueish = list(pvalueish) return pvalueish, sum([list(p.values()) for p in pvalueish], []) - def apply(self, pcoll_dicts): + def expand(self, pcoll_dicts): keys = reduce(operator.or_, [set(p.keys()) for p in pcoll_dicts]) res = {} for k in keys: @@ -575,7 +575,7 @@ class CustomTransform(beam.PTransform): pardo = None - def apply(self, pcoll): + def expand(self, pcoll): self.pardo = beam.FlatMap('*do*', lambda x: [x + 1]) return pcoll | self.pardo diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index 05ba6ab6dc25e..46731bf6ac910 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -50,7 +50,7 @@ def infer_output_type(self, input_type): # typehints.View[...]. return input_type - def apply(self, pcoll): + def expand(self, pcoll): return self.view @@ -68,7 +68,7 @@ def __init__(self, has_default, default_value, label=None): self.has_default = has_default self.default_value = default_value - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) input_type = pcoll.element_type output_type = input_type @@ -93,7 +93,7 @@ def __init__(self, label=None): label = 'ViewAsIterable(%s)' % label super(ViewAsIterable, self).__init__(label=label) - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) input_type = pcoll.element_type output_type = typehints.Iterable[input_type] @@ -118,7 +118,7 @@ def __init__(self, label=None): label = 'ViewAsList(%s)' % label super(ViewAsList, self).__init__(label=label) - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) input_type = pcoll.element_type output_type = typehints.List[input_type] @@ -144,7 +144,7 @@ def __init__(self, label=None): label = 'ViewAsDict(%s)' % label super(ViewAsDict, self).__init__(label=label) - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) input_type = pcoll.element_type key_type, value_type = ( diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index ebe6ba98549e9..98159965e3b55 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -99,7 +99,7 @@ def _extract_input_pvalues(self, pvalueish): pcolls = tuple(pvalueish) return pcolls, pcolls - def apply(self, pcolls): + def expand(self, pcolls): """Performs CoGroupByKey on argument pcolls; see class docstring.""" # For associating values in K-V pairs with the PCollections they came from. def _pair_tag_with_value((key, value), tag): @@ -222,7 +222,7 @@ def match(_, actual): class AssertThat(PTransform): - def apply(self, pipeline): + def expand(self, pipeline): return pipeline | 'singleton' >> Create([None]) | Map( match, AsList(actual | core.WindowInto(window.GlobalWindows()))) diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py index e7cdbd413f1ba..9a1a7de2aca83 100644 --- a/sdks/python/apache_beam/transforms/write_ptransform_test.py +++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py @@ -84,7 +84,7 @@ def __init__(self, return_init_result=True, return_write_results=True): self.last_sink = None self.label = 'write_to_test_sink' - def apply(self, pcoll): + def expand(self, pcoll): self.last_sink = _TestSink(return_init_result=self.return_init_result, return_write_results=self.return_write_results) return pcoll | beam.io.Write(self.last_sink) diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index f2e8f12420a48..329d6571bd7fb 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -202,7 +202,7 @@ class CustomTransform(beam.PTransform): def _extract_input_pvalues(self, pvalueish): return pvalueish, (pvalueish['in0'], pvalueish['in1']) - def apply(self, pvalueish): + def expand(self, pvalueish): return {'out0': pvalueish['in0'], 'out1': pvalueish['in1']} # TODO(robertwb): (typecheck) Make these the default?