From c3fcb69999ae4c1e9ccc84999f4b124f0ab3764e Mon Sep 17 00:00:00 2001 From: Vaibhav Nivargi Date: Mon, 7 Apr 2014 17:08:34 -0700 Subject: [PATCH 1/2] adding full outer join support Conflicts: core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala --- .../apache/spark/api/java/JavaPairRDD.scala | 31 ++++++++++++++ .../apache/spark/rdd/PairRDDFunctions.scala | 36 ++++++++++++++++ .../org/apache/spark/PartitioningSuite.scala | 3 ++ .../spark/rdd/PairRDDFunctionsSuite.scala | 15 +++++++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 1 + python/pyspark/join.py | 16 +++++++ python/pyspark/rdd.py | 17 ++++++++ .../streaming/api/java/JavaPairDStream.scala | 42 +++++++++++++++++++ .../dstream/PairDStreamFunctions.scala | 36 ++++++++++++++++ .../streaming/BasicOperationsSuite.scala | 15 +++++++ 10 files changed, 212 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index f430a33db1e4a..0f4f1e9bc53d9 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -315,6 +315,19 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K } /** + * Perform a full outer join of `this` and `other`. Output will have + * each row from both RDDs or `None` where missing, i.e. one of + * (k, (Some(v), Some(w)), (k, (Some(v), None)) or (k, (None, Some(w)) + * depending on the presence of (k, v) and/or (k, w) in `this` and `other` + * Uses the given Partitioner to partition the output RDD. + */ + def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) + : JavaPairRDD[K, (Optional[V], Optional[W])] = { + val joinResult = rdd.fullOuterJoin(other, partitioner) + fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}) + } + + /** * Simplified version of combineByKey that hash-partitions the resulting RDD using the existing * partitioner/parallelism level. */ @@ -402,6 +415,24 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } + /** + * Perform a full outer join of `this` and `other`. Hash-partitions the resulting + * RDD using the existing partitioner/parallelism level. + */ + def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = { + val joinResult = rdd.fullOuterJoin(other) + fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}) + } + + /** + * Perform a full outer join of `this` and `other`. Hash-partitions the resulting + * RDD into the given number of partitions. + */ + def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], Optional[W])] = { + val joinResult = rdd.fullOuterJoin(other, numPartitions) + fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}) + } + /** * Return the key-value pairs in this RDD to the master as a Map. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 0bc09b31fe14e..c5db82aa578e5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -337,6 +337,26 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) } } + /** + * Perform a full outer join of `this` and `other`. Output will have + * each row from both RDDs or `None` where missing, i.e. one of + * (k, (Some(v), Some(w)), (k, (Some(v), None)) or (k, (None, Some(w)) + * depending on the presence of (k, v) and/or (k, w) in `this` and `other` + * Uses the given Partitioner to partition the output RDD. + */ + def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) + : RDD[(K, (Option[V], Option[W]))] = { + this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => + if (vs.isEmpty && !ws.isEmpty) { + ws.iterator.map(w => (None, Some(w))) + } else if (ws.isEmpty && !vs.isEmpty) { + vs.iterator.map(v => (Some(v), None)) + } else { + for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w)) + } + } + } + /** * Simplified version of combineByKey that hash-partitions the resulting RDD using the * existing partitioner/parallelism level. @@ -422,6 +442,22 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) rightOuterJoin(other, new HashPartitioner(numPartitions)) } + /** + * Perform a full outer join of `this` and `other`. Hash-partitions the resulting + * RDD using the existing partitioner/parallelism level. + */ + def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = { + fullOuterJoin(other, defaultPartitioner(self, other)) + } + + /** + * Perform a full outer join of `this` and `other`. Hash-partitions the resulting + * RDD into the given number of partitions. + */ + def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = { + fullOuterJoin(other, new HashPartitioner(numPartitions)) + } + /** * Return the key-value pairs in this RDD to the master as a Map. */ diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 1374d01774693..1b6a29acfa13d 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -102,11 +102,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext { assert(grouped2.join(grouped4).partitioner === grouped4.partitioner) assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner) assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner) + assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner) assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner) assert(grouped2.join(reduced2).partitioner === grouped2.partitioner) assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner) assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner) + assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner) assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner) assert(grouped2.map(_ => 1).partitioner === None) @@ -127,6 +129,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext { assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array")) + assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array")) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 9c78630944277..792f5f016a71f 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -202,6 +202,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { )) } + test("fullOuterJoin") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.fullOuterJoin(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (Some(1), Some('x'))), + (1, (Some(2), Some('x'))), + (2, (Some(1), Some('y'))), + (2, (Some(1), Some('z'))), + (3, (Some(1), None)), + (4, (None, Some('w'))) + )) + } + test("join with no matches") { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index ac9df34afe6ee..fdfb4c7f91f9e 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -175,6 +175,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(rdd.join(emptyKv).collect().size === 0) assert(rdd.rightOuterJoin(emptyKv).collect().size === 0) assert(rdd.leftOuterJoin(emptyKv).collect().size === 2) + assert(rdd.fullOuterJoin(emptyKv).collect().size === 2) assert(rdd.cogroup(emptyKv).collect().size === 2) assert(rdd.union(emptyKv).collect().size === 2) } diff --git a/python/pyspark/join.py b/python/pyspark/join.py index 5f4294fb1b777..96992db240a11 100644 --- a/python/pyspark/join.py +++ b/python/pyspark/join.py @@ -78,6 +78,22 @@ def dispatch(seq): return _do_python_join(rdd, other, numPartitions, dispatch) +def python_full_outer_join(rdd, other, numPartitions): + def dispatch(seq): + vbuf, wbuf = [], [] + for (n, v) in seq: + if n == 1: + vbuf.append(v) + elif n == 2: + wbuf.append(v) + if not vbuf: + vbuf.append(None) + if not wbuf: + wbuf.append(None) + return [(v, w) for v in vbuf for w in wbuf] + return _do_python_join(rdd, other, numPartitions, dispatch) + + def python_cogroup(rdd, other, numPartitions): vs = rdd.map(lambda (k, v): (k, (1, v))) ws = other.map(lambda (k, v): (k, (2, v))) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index ace8476c59f05..a85ca59c94ae1 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -925,6 +925,23 @@ def rightOuterJoin(self, other, numPartitions=None): """ return python_right_outer_join(self, other, numPartitions) + def fullOuterJoin(self, other, numPartitions=None): + """ + Perform a full outer join of C{self} and C{other}. + + Output will have each row from both RDDs or None where missing, i.e. + one of (k, (v, w)), (k, (v, None)), or (k, (None, w)) depending on + the presence of (k, v) and/or (k, w) in C{self} and C{other} + + Hash-partitions the resulting RDD into the given number of partitions. + + >>> x = sc.parallelize([("a", 1), ("b", 4)]) + >>> y = sc.parallelize([("a", 2)]) + >>> sorted(y.fullOuterJoin(x).collect()) + [('a', (2, 1)), ('b', (None, 4))] + """ + return python_full_outer_join(self, other, numPartitions) + # TODO: add option to control map-side combining def partitionBy(self, numPartitions, partitionFunc=hash): """ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 62cfa0a229db1..6c8af831291b5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -669,6 +669,48 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def fullOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], Optional[W])] = { + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.fullOuterJoin(other.dstream) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))} + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def fullOuterJoin[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (Optional[V], Optional[W])] = { + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.fullOuterJoin(other.dstream, numPartitions) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))} + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def fullOuterJoin[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (Optional[V], Optional[W])] = { + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.fullOuterJoin(other.dstream, partitioner) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))} + } + /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 7c7992d688aeb..f6f45f2de48f8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -558,6 +558,42 @@ extends Serializable { ) } + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = { + fullOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def fullOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (Option[V], Option[W]))] = { + fullOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def fullOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (Option[V], Option[W]))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner) + ) + } + /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index bcb0c28bf07a0..95f1588b86195 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -301,6 +301,21 @@ class BasicOperationsSuite extends TestSuiteBase { testOperation(inputData1, inputData2, operation, outputData, true) } + test("fullOuterJoin") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (Some(1), Some("x"))), ("b", (Some(1), Some("x"))) ), + Seq( ("", (Some(1), Some("x"))), ("b", (None, Some("x"))), ("a", (Some(1), None)) ), + Seq( ("", (Some(1), None)) ), + Seq( ("", (None, Some("x"))) ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x, 1)).fullOuterJoin(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + test("updateStateByKey") { val inputData = Seq( From 3e566bc8f417e9e4ef27e6fa6e8474113da7755f Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Thu, 24 Apr 2014 13:54:06 -0700 Subject: [PATCH 2/2] update comment in pyspark code --- python/pyspark/rdd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a85ca59c94ae1..a26ee00aadc7e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -936,9 +936,9 @@ def fullOuterJoin(self, other, numPartitions=None): Hash-partitions the resulting RDD into the given number of partitions. >>> x = sc.parallelize([("a", 1), ("b", 4)]) - >>> y = sc.parallelize([("a", 2)]) + >>> y = sc.parallelize([("a", 2), ("c", 3)]) >>> sorted(y.fullOuterJoin(x).collect()) - [('a', (2, 1)), ('b', (None, 4))] + [('a', (2, 1)), ('b', (None, 4)), ('c', (3, None))] """ return python_full_outer_join(self, other, numPartitions)