Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ import re
import apache_beam as beam
p = beam.Pipeline('DirectPipelineRunner')
class MyCountTransform(beam.PTransform):
def apply(self, pcoll):
def expand(self, pcoll):
return (pcoll
| 'one word' >> beam.Map(lambda word: (word, 1))
# GroupByKey accepts a PCollection of (word, 1) elements and
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/examples/complete/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(self, count):
super(TopPerPrefix, self).__init__()
self._count = count

def apply(self, words):
def expand(self, words):
"""Compute the most common words for each possible prefixes.

Args:
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/examples/complete/estimate_pi.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def encode(self, x):
class EstimatePiTransform(beam.PTransform):
"""Runs 10M trials, and combine the results to estimate pi."""

def apply(self, pcoll):
def expand(self, pcoll):
# A hundred work items of a hundred thousand tries each.
return (pcoll
| 'Initialize' >> beam.Create([100000] * 100).with_output_types(int)
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/examples/complete/tfidf.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class TfIdf(beam.PTransform):
The output is mapping from terms to scores for each document URI.
"""

def apply(self, uri_to_content):
def expand(self, uri_to_content):

# Compute the total number of documents, and prepare a singleton
# PCollection to use as side input.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class ComputeSessions(beam.PTransform):
def __init__(self):
super(ComputeSessions, self).__init__()

def apply(self, pcoll):
def expand(self, pcoll):
return (pcoll
| beam.WindowInto('ComputeSessionsWindow',
window.Sessions(gap_size=ONE_HOUR_IN_SECONDS))
Expand All @@ -89,7 +89,7 @@ class TopPerMonth(beam.PTransform):
def __init__(self):
super(TopPerMonth, self).__init__()

def apply(self, pcoll):
def expand(self, pcoll):
return (pcoll
| beam.WindowInto('TopPerMonthWindow',
window.FixedWindows(
Expand Down Expand Up @@ -127,7 +127,7 @@ def __init__(self, sampling_threshold):
super(ComputeTopSessions, self).__init__()
self.sampling_threshold = sampling_threshold

def apply(self, pcoll):
def expand(self, pcoll):
return (pcoll
| beam.ParDo('ExtractUserAndTimestamp',
ExtractUserAndTimestampDoFn())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
class Count1(beam.PTransform):
"""Count as a subclass of PTransform, with an apply method."""

def apply(self, pcoll):
def expand(self, pcoll):
return (
pcoll
| 'ParWithOne' >> beam.Map(lambda v: (v, 1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class CountWords(beam.PTransform):
of "word: count" strings.
"""

def apply(self, pcoll):
def expand(self, pcoll):
return (pcoll
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
Expand Down
16 changes: 8 additions & 8 deletions sdks/python/apache_beam/examples/snippets/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def construct_pipeline(renames):
class ReverseWords(beam.PTransform):
"""A PTransform that reverses individual elements in a PCollection."""

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | beam.Map(lambda e: e[::-1])

def filter_words(unused_x):
Expand Down Expand Up @@ -387,7 +387,7 @@ def process(self, context):
# The CountWords Composite Transform inside the WordCount pipeline.
class CountWords(beam.PTransform):

def apply(self, pcoll):
def expand(self, pcoll):
return (pcoll
# Convert lines of text into individual words.
| 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
Expand Down Expand Up @@ -508,7 +508,7 @@ def _add_argparse_args(cls, parser):
# [START examples_wordcount_wordcount_composite]
class CountWords(beam.PTransform):

def apply(self, pcoll):
def expand(self, pcoll):
return (pcoll
# Convert lines of text into individual words.
| beam.FlatMap(
Expand Down Expand Up @@ -705,7 +705,7 @@ def __init__(self, count, **kwargs):
super(ReadFromCountingSource, self).__init__(**kwargs)
self._count = count

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | iobase.Read(_CountingSource(count))
# [END model_custom_source_new_ptransform]

Expand Down Expand Up @@ -838,7 +838,7 @@ def __init__(self, label, url, final_table_name, **kwargs):
self._url = url
self._final_table_name = final_table_name

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | iobase.Write(_SimpleKVSink(self._url,
self._final_table_name))
# [END model_custom_sink_new_ptransform]
Expand Down Expand Up @@ -1001,7 +1001,7 @@ def model_composite_transform_example(contents, output_path):
class CountWords(beam.PTransform):
# [END composite_ptransform_declare]

def apply(self, pcoll):
def expand(self, pcoll):
return (pcoll
| beam.FlatMap(lambda x: re.findall(r'\w+', x))
| beam.combiners.Count.PerElement()
Expand Down Expand Up @@ -1197,7 +1197,7 @@ def join_info(name, emails, phone_numbers):
# [START model_library_transforms_keys]
class Keys(beam.PTransform):

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | 'Keys' >> beam.Map(lambda (k, v): k)
# [END model_library_transforms_keys]
# pylint: enable=invalid-name
Expand All @@ -1206,7 +1206,7 @@ def apply(self, pcoll):
# [START model_library_transforms_count]
class Count(beam.PTransform):

def apply(self, pcoll):
def expand(self, pcoll):
return (
pcoll
| 'PairWithOne' >> beam.Map(lambda v: (v, 1))
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/examples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def process(self, context):
@beam.typehints.with_input_types(T)
@beam.typehints.with_output_types(beam.typehints.Tuple[int, T])
class MyTransform(beam.PTransform):
def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | beam.Map(lambda x: (len(x), x))

words_with_lens = words | MyTransform()
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/examples/wordcount_debugging.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class CountWords(beam.PTransform):
def __init__(self):
super(CountWords, self).__init__()

def apply(self, pcoll):
def expand(self, pcoll):
return (pcoll
| 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/avroio.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(self, file_pattern=None, min_bundle_size=0, validate=True):
super(ReadFromAvro, self).__init__()
self._source = _AvroSource(file_pattern, min_bundle_size, validate=validate)

def apply(self, pvalue):
def expand(self, pvalue):
return pvalue.pipeline | Read(self._source)

def display_data(self):
Expand Down Expand Up @@ -294,7 +294,7 @@ def __init__(self,
self._sink = _AvroSink(file_path_prefix, schema, codec, file_name_suffix,
num_shards, shard_name_template, mime_type)

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | beam.io.iobase.Write(self._sink)

def display_data(self):
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/datastore/v1/datastoreio.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __init__(self, project, query, namespace=None, num_splits=0):
self._query = query
self._num_splits = num_splits

def apply(self, pcoll):
def expand(self, pcoll):
# This is a composite transform involves the following:
# 1. Create a singleton of the user provided `query` and apply a ``ParDo``
# that splits the query into `num_splits` and assign each split query a
Expand Down Expand Up @@ -312,7 +312,7 @@ def __init__(self, project, mutation_fn):
self._mutation_fn = mutation_fn
logging.warning('datastoreio write transform is experimental.')

def apply(self, pcoll):
def expand(self, pcoll):
return (pcoll
| 'Convert to Mutation' >> Map(self._mutation_fn)
| 'Write Mutation to Datastore' >> ParDo(_Mutate.DatastoreWriteFn(
Expand Down
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ def __init__(self, *args, **kwargs):
super(Read, self).__init__(label)
self.source = source

def apply(self, pbegin):
def expand(self, pbegin):
assert isinstance(pbegin, pvalue.PBegin)
self.pipeline = pbegin.pipeline
return pvalue.PCollection(self.pipeline)
Expand Down Expand Up @@ -723,7 +723,7 @@ def display_data(self):
return {'sink': self.sink.__class__,
'sink_dd': self.sink}

def apply(self, pcoll):
def expand(self, pcoll):
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
if isinstance(self.sink, dataflow_io.NativeSink):
# A native sink
Expand All @@ -746,7 +746,7 @@ def __init__(self, sink):
super(WriteImpl, self).__init__()
self.sink = sink

def apply(self, pcoll):
def expand(self, pcoll):
do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None])
init_result_coll = do_once | core.Map(
'initialize_write', lambda _, sink: sink.initialize_write(), self.sink)
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/textio.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def __init__(
strip_trailing_newlines, coder,
validate=validate)

def apply(self, pvalue):
def expand(self, pvalue):
return pvalue.pipeline | Read(self._source)

def display_data(self):
Expand Down Expand Up @@ -297,7 +297,7 @@ def __init__(
append_trailing_newlines, num_shards,
shard_name_template, coder, compression_type)

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | Write(self._sink)

def display_data(self):
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def custom_callable(pcoll):

class CustomTransform(PTransform):

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | '+1' >> FlatMap(lambda x: [x + 1])

class Visitor(PipelineVisitor):
Expand Down Expand Up @@ -174,7 +174,7 @@ def __init__(self, suffix):
# No call to super(...).__init__
self.suffix = suffix

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | Map(lambda x: x + self.suffix)

self.assertEqual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,6 @@ def __init__(self, *args, **kwargs):
super(_NativeWrite, self).__init__(label)
self.sink = sink

def apply(self, pcoll):
def expand(self, pcoll):
self._check_pcollection(pcoll)
return pvalue.PDone(pcoll.pipeline)
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def cache(self):

def apply(self, transform, input): # pylint: disable=redefined-builtin
"""Runner callback for a pipeline.apply call."""
return transform.apply(input)
return transform.expand(input)


class BufferingInMemoryCache(object):
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ def apply(self, transform, input):
'Execution of [%s] not implemented in runner %s.' % (transform, self))

def apply_PTransform(self, transform, input):
# The base case of apply is to call the transform's apply.
return transform.apply(input)
# The base case of apply is to call the transform's expand.
return transform.expand(input)

def run_transform(self, transform_node):
"""Runner callback for a pipeline.run call.
Expand Down
14 changes: 7 additions & 7 deletions sdks/python/apache_beam/transforms/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ class Mean(object):
class Globally(ptransform.PTransform):
"""combiners.Mean.Globally computes the arithmetic mean of the elements."""

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | core.CombineGlobally(MeanCombineFn())

class PerKey(ptransform.PTransform):
"""combiners.Mean.PerKey finds the means of the values for each key."""

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | core.CombinePerKey(MeanCombineFn())


Expand Down Expand Up @@ -105,19 +105,19 @@ class Count(object):
class Globally(ptransform.PTransform):
"""combiners.Count.Globally counts the total number of elements."""

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | core.CombineGlobally(CountCombineFn())

class PerKey(ptransform.PTransform):
"""combiners.Count.PerKey counts how many elements each unique key has."""

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | core.CombinePerKey(CountCombineFn())

class PerElement(ptransform.PTransform):
"""combiners.Count.PerElement counts how many times each element occurs."""

def apply(self, pcoll):
def expand(self, pcoll):
paired_with_void_type = KV[pcoll.element_type, Any]
return (pcoll
| (core.Map('%s:PairWithVoid' % self.label, lambda x: (x, None))
Expand Down Expand Up @@ -475,7 +475,7 @@ class ToList(ptransform.PTransform):
def __init__(self, label='ToList'):
super(ToList, self).__init__(label)

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | core.CombineGlobally(self.label, ToListCombineFn())


Expand Down Expand Up @@ -509,7 +509,7 @@ class ToDict(ptransform.PTransform):
def __init__(self, label='ToDict'):
super(ToDict, self).__init__(label)

def apply(self, pcoll):
def expand(self, pcoll):
return pcoll | core.CombineGlobally(self.label, ToDictCombineFn())


Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def test_combine_globally_without_default(self):

def test_combine_globally_with_default_side_input(self):
class CombineWithSideInput(PTransform):
def apply(self, pcoll):
def expand(self, pcoll):
side = pcoll | CombineGlobally(sum).as_singleton_view()
main = pcoll.pipeline | Create([None])
return main | Map(lambda _, s: s, side)
Expand Down
Loading