From e4cc54b0d22b11d4c902c05d25403d87080ff960 Mon Sep 17 00:00:00 2001 From: Alex Behrens Date: Mon, 11 Jan 2016 15:44:58 -0600 Subject: [PATCH] [pyspark] adding disjunction and difference functions for rdds --- python/pyspark/rdd.py | 30 +++++++++++++++++++++++++++++ python/pyspark/streaming/dstream.py | 18 +++++++++++++++++ python/pyspark/streaming/tests.py | 20 +++++++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a019c05862549..6fec2b2ef75a8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1926,6 +1926,36 @@ def cogroup(self, other, numPartitions=None): """ return python_cogroup((self, other), numPartitions) + def disjunction(self, other, numPartitions=None): + """ + For each key-value pair in C{self} and C{other}, return a resulting RDD that + contains the values which are not both in C{self} and C{other}. + + >>> x = sc.parallelize([("a", 1), ("b", 4)]) + >>> y = sc.parallelize([("a", 2), ("d", 5)]) + >>> x.disjunction(y).collect() + [('b', 4), ('d', 5)] + """ + return self.map(lambda v: (v, None)) \ + .cogroup(other.map(lambda v: (v, None))) \ + .filter(lambda k_vs: not all(k_vs[1])) \ + .keys() + + def difference(self, other, numPartitions=None): + """ + For each key-value pair in C{self}, return a resulting RDD that + contains all key-value pairs for which the key does not exist in C{other}. + + >>> x = sc.parallelize([("a", 1), ("b", 4)]) + >>> y = sc.parallelize([("a", 2)]) + >>> x.difference(y).collect() + [('a', 1)] + """ + return self.map(lambda v: (v, None)) \ + .cogroup(other.map(lambda v: (v, None))) \ + .filter(lambda k_vs: not len(k_vs[1][1])) \ + .keys() + def sampleByKey(self, withReplacement, fractions, seed=None): """ Return a subset of this RDD sampled by key (via stratified sampling). diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 86447f5e58ecb..fe7429110ca1e 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -349,6 +349,24 @@ def cogroup(self, other, numPartitions=None): numPartitions = self._sc.defaultParallelism return self.transformWith(lambda a, b: a.cogroup(b, numPartitions), other) + def disjunction(self, other, numPartitions=None): + """ + Returns a new DStream by applying 'disjunction' between RDDs of this + DStream and `other` DStream. + """ + if numPartitions is None: + numPartitions = self._sc.defaultParallelism + return self.transformWith(lambda a, b: a.disjunction(b, numPartitions), other) + + def difference(self, other, numPartitions=None): + """ + Returns a new DStream by applying 'difference' between RDDs of this + DStream and `other` DStream. + """ + if numPartitions is None: + numPartitions = self._sc.defaultParallelism + return self.transformWith(lambda a, b: a.difference(b, numPartitions), other) + def join(self, other, numPartitions=None): """ Return a new DStream by applying 'join' between RDDs of this DStream and diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 86b05d9fd2424..fa313d51bb5a2 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -348,6 +348,26 @@ def func(d1, d2): [("a", ([1, 1], [1, 1])), ("b", ([1], [1])), ("", ([1, 1], [1, 2]))]] self._test_func(input, func, expected, sort=True, input2=input2) + def test_disjunction(self): + input = [[(1, 1), (2, 1), (3, 1)]] + input2 = [[(1, 1), (3, 1), (4, 1)]] + + def func(d1, d2): + return d1.disjunction(d2) + + expected = [[(2, 1), (4, 1)]] + self._test_func(input, func, expected, True, input2) + + def test_difference(self): + input = [[(1, 1), (2, 1), (3, 1)]] + input2 = [[(1, 1), (3, 1), (4, 1)]] + + def func(d1, d2): + return d1.difference(d2) + + expected = [[(2, 1)]] + self._test_func(input, func, expected, True, input2) + def test_join(self): input = [[('a', 1), ('b', 2)]] input2 = [[('b', 3), ('c', 4)]]