Skip to content

Commit

Permalink
[BEAM-3208] Add Distinct transform to mirror Java SDK
Browse files Browse the repository at this point in the history
The Java SDK introduced a Distinct transform to replace the
RemoveDuplicates transform. But, the Python SDK still had the
RemoveDuplicates transform. A new Distinct transform was added
which does the same thing as RemoveDuplicates. RemoveDuplicates is
now an alias to the Distinct Transform and is deprecated.
  • Loading branch information
ttanay committed Feb 22, 2019
1 parent 67e1e25 commit 6ae5f5d
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 8 deletions.
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/examples/complete/tfidf.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def expand(self, uri_to_content):
total_documents = (
uri_to_content
| 'GetUris 1' >> beam.Keys()
| 'GetUniqueUris' >> beam.RemoveDuplicates()
| 'GetUniqueUris' >> beam.Distinct()
| 'CountUris' >> beam.combiners.Count.Globally())

# Create a collection of pairs mapping a URI to each of the words
Expand All @@ -81,7 +81,7 @@ def split_into_words(uri_line):
# in which it appears.
word_to_doc_count = (
uri_to_words
| 'GetUniqueWordsPerDoc' >> beam.RemoveDuplicates()
| 'GetUniqueWordsPerDoc' >> beam.Distinct()
| 'GetWords' >> beam.Values()
| 'CountDocsPerWord' >> beam.combiners.Count.PerElement())

Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/transforms/ptransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class and wrapper class that allows lambda functions to be used as
from builtins import hex
from builtins import object
from builtins import zip
from functools import reduce, wraps
from functools import reduce
from functools import wraps

from google.protobuf import message

Expand Down
12 changes: 10 additions & 2 deletions sdks/python/apache_beam/transforms/ptransform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,14 @@ def test_kv_swap(self):
assert_that(result, equal_to([(1, 7), (2, 1), (2, 3), (2, 5), (3, 6)]))
pipeline.run()

def test_distinct(self):
pipeline = TestPipeline()
pcoll = pipeline | 'Start' >> beam.Create(
[6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
result = pcoll.apply(beam.Distinct())
assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
pipeline.run()

def test_remove_duplicates(self):
pipeline = TestPipeline()
pcoll = pipeline | 'Start' >> beam.Create(
Expand Down Expand Up @@ -742,7 +750,7 @@ def SamplePTransform(pcoll):
"""Sample transform using the @ptransform_fn decorator."""
map_transform = 'ToPairs' >> beam.Map(lambda v: (v, None))
combine_transform = 'Group' >> beam.CombinePerKey(lambda vs: None)
keys_transform = 'RemoveDuplicates' >> beam.Keys()
keys_transform = 'Distinct' >> beam.Keys()
return pcoll | map_transform | combine_transform | keys_transform


Expand Down Expand Up @@ -807,7 +815,7 @@ def test_apply_ptransform_using_decorator(self):
self.assertTrue('*Sample*' in pipeline.applied_labels)
self.assertTrue('*Sample*/ToPairs' in pipeline.applied_labels)
self.assertTrue('*Sample*/Group' in pipeline.applied_labels)
self.assertTrue('*Sample*/RemoveDuplicates' in pipeline.applied_labels)
self.assertTrue('*Sample*/Distinct' in pipeline.applied_labels)

def test_combine_with_label(self):
vals = [1, 2, 3, 4, 5, 6, 7]
Expand Down
11 changes: 8 additions & 3 deletions sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@
from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils import windowed_value
from apache_beam.utils.annotations import deprecated

__all__ = [
'BatchElements',
'CoGroupByKey',
'Distinct',
'Keys',
'KvSwap',
'RemoveDuplicates',
Expand Down Expand Up @@ -193,12 +195,15 @@ def KvSwap(label='KvSwap'): # pylint: disable=invalid-name


@ptransform_fn
def RemoveDuplicates(pcoll): # pylint: disable=invalid-name
"""Produces a PCollection containing the unique elements of a PCollection."""
def Distinct(pcoll): # pylint: disable=invalid-name
"""Produces a PCollection containing distinct elements of a PCollection."""
return (pcoll
| 'ToPairs' >> Map(lambda v: (v, None))
| 'Group' >> CombinePerKey(lambda vs: None)
| 'RemoveDuplicates' >> Keys())
| 'Distinct' >> Keys())


RemoveDuplicates = deprecated(since='2.11', current='Distinct')(Distinct)


class _BatchSizeEstimator(object):
Expand Down

0 comments on commit 6ae5f5d

Please sign in to comment.