Skip to content

Commit

Permalink
[BEAM-3208] Add Distinct PTransform to mirror Java SDK
Browse files Browse the repository at this point in the history
The Java SDK introduced a Distinct PTransform to replace the
RemoveDuplicates PTransform. But, the Python SDK still had the
RemoveDuplicates PTransform. A new Distinct PTransform was added
which does the same thing as RemoveDuplicates. RemoveDuplicates is
now an alias to the Distinct PTransform and is deprecated.
  • Loading branch information
ttanay committed Mar 8, 2019
1 parent ae243ad commit d96e913
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 8 deletions.
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/examples/complete/tfidf.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def expand(self, uri_to_content):
total_documents = (
uri_to_content
| 'GetUris 1' >> beam.Keys()
| 'GetUniqueUris' >> beam.RemoveDuplicates()
| 'GetUniqueUris' >> beam.Distinct()
| 'CountUris' >> beam.combiners.Count.Globally())

# Create a collection of pairs mapping a URI to each of the words
Expand All @@ -81,7 +81,7 @@ def split_into_words(uri_line):
# in which it appears.
word_to_doc_count = (
uri_to_words
| 'GetUniqueWordsPerDoc' >> beam.RemoveDuplicates()
| 'GetUniqueWordsPerDoc' >> beam.Distinct()
| 'GetWords' >> beam.Values()
| 'CountDocsPerWord' >> beam.combiners.Count.PerElement())

Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/transforms/ptransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class and wrapper class that allows lambda functions to be used as
from builtins import object
from builtins import zip
from functools import reduce
from functools import wraps

from google.protobuf import message

Expand Down Expand Up @@ -858,7 +859,7 @@ def expand(self, pcoll):
(first argument if no label was specified and second argument otherwise).
"""
# TODO(robertwb): Consider removing staticmethod to allow for self parameter.

@wraps(fn)
def callable_ptransform_factory(*args, **kwargs):
return _PTransformFnPTransform(fn, *args, **kwargs)
return callable_ptransform_factory
Expand Down
12 changes: 10 additions & 2 deletions sdks/python/apache_beam/transforms/ptransform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,14 @@ def test_kv_swap(self):
assert_that(result, equal_to([(1, 7), (2, 1), (2, 3), (2, 5), (3, 6)]))
pipeline.run()

def test_distinct(self):
pipeline = TestPipeline()
pcoll = pipeline | 'Start' >> beam.Create(
[6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
result = pcoll.apply(beam.Distinct())
assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
pipeline.run()

def test_remove_duplicates(self):
pipeline = TestPipeline()
pcoll = pipeline | 'Start' >> beam.Create(
Expand Down Expand Up @@ -742,7 +750,7 @@ def SamplePTransform(pcoll):
"""Sample transform using the @ptransform_fn decorator."""
map_transform = 'ToPairs' >> beam.Map(lambda v: (v, None))
combine_transform = 'Group' >> beam.CombinePerKey(lambda vs: None)
keys_transform = 'RemoveDuplicates' >> beam.Keys()
keys_transform = 'Distinct' >> beam.Keys()
return pcoll | map_transform | combine_transform | keys_transform


Expand Down Expand Up @@ -807,7 +815,7 @@ def test_apply_ptransform_using_decorator(self):
self.assertTrue('*Sample*' in pipeline.applied_labels)
self.assertTrue('*Sample*/ToPairs' in pipeline.applied_labels)
self.assertTrue('*Sample*/Group' in pipeline.applied_labels)
self.assertTrue('*Sample*/RemoveDuplicates' in pipeline.applied_labels)
self.assertTrue('*Sample*/Distinct' in pipeline.applied_labels)

def test_combine_with_label(self):
vals = [1, 2, 3, 4, 5, 6, 7]
Expand Down
15 changes: 12 additions & 3 deletions sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@
from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils import windowed_value
from apache_beam.utils.annotations import deprecated

__all__ = [
'BatchElements',
'CoGroupByKey',
'Distinct',
'Keys',
'KvSwap',
'RemoveDuplicates',
Expand Down Expand Up @@ -193,12 +195,19 @@ def KvSwap(label='KvSwap'): # pylint: disable=invalid-name


@ptransform_fn
def RemoveDuplicates(pcoll): # pylint: disable=invalid-name
"""Produces a PCollection containing the unique elements of a PCollection."""
def Distinct(pcoll): # pylint: disable=invalid-name
"""Produces a PCollection containing distinct elements of a PCollection."""
return (pcoll
| 'ToPairs' >> Map(lambda v: (v, None))
| 'Group' >> CombinePerKey(lambda vs: None)
| 'RemoveDuplicates' >> Keys())
| 'Distinct' >> Keys())


@deprecated(since='2.12', current='Distinct')
@ptransform_fn
def RemoveDuplicates(pcoll):
"""Produces a PCollection containing distinct elements of a PCollection."""
return pcoll | 'RemoveDuplicates' >> Distinct()


class _BatchSizeEstimator(object):
Expand Down

0 comments on commit d96e913

Please sign in to comment.