diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 2142fd73278ac..b2614fcaa23e3 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -315,6 +315,19 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } + /** + * Perform a full outer join of `this` and `other`. Output will have + * each row from both RDDs or `None` where missing, i.e. one of + * (k, (Some(v), Some(w)), (k, (Some(v), None)) or (k, (None, Some(w)) + * depending on the presence of (k, v) and/or (k, w) in `this` and `other` + * Uses the given Partitioner to partition the output RDD. + */ + def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) + : JavaPairRDD[K, (Optional[V], Optional[W])] = { + val joinResult = rdd.fullOuterJoin(other, partitioner) + fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}) + } + /** * Simplified version of combineByKey that hash-partitions the resulting RDD using the existing * partitioner/parallelism level. @@ -404,6 +417,24 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } + /** + * Perform a full outer join of `this` and `other`. Hash-partitions the resulting + * RDD using the existing partitioner/parallelism level. + */ + def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = { + val joinResult = rdd.fullOuterJoin(other) + fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}) + } + + /** + * Perform a full outer join of `this` and `other`. Hash-partitions the resulting + * RDD into the given number of partitions. + */ + def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], Optional[W])] = { + val joinResult = rdd.fullOuterJoin(other, numPartitions) + fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}) + } + /** * Return the key-value pairs in this RDD to the master as a Map. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 93b78e123267c..28fe08e6796b6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -292,6 +292,26 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) } } + /** + * Perform a full outer join of `this` and `other`. Output will have + * each row from both RDDs or `None` where missing, i.e. one of + * (k, (Some(v), Some(w)), (k, (Some(v), None)) or (k, (None, Some(w)) + * depending on the presence of (k, v) and/or (k, w) in `this` and `other` + * Uses the given Partitioner to partition the output RDD. + */ + def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) + : RDD[(K, (Option[V], Option[W]))] = { + this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => + if (vs.isEmpty && !ws.isEmpty) { + ws.iterator.map(w => (None, Some(w))) + } else if (ws.isEmpty && !vs.isEmpty) { + vs.iterator.map(v => (Some(v), None)) + } else { + for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w)) + } + } + } + /** * Simplified version of combineByKey that hash-partitions the resulting RDD using the * existing partitioner/parallelism level. @@ -377,6 +397,22 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) rightOuterJoin(other, new HashPartitioner(numPartitions)) } + /** + * Perform a full outer join of `this` and `other`. Hash-partitions the resulting + * RDD using the existing partitioner/parallelism level. + */ + def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = { + fullOuterJoin(other, defaultPartitioner(self, other)) + } + + /** + * Perform a full outer join of `this` and `other`. Hash-partitions the resulting + * RDD into the given number of partitions. + */ + def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = { + fullOuterJoin(other, new HashPartitioner(numPartitions)) + } + /** * Return the key-value pairs in this RDD to the master as a Map. */ diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 1374d01774693..1b6a29acfa13d 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -102,11 +102,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext { assert(grouped2.join(grouped4).partitioner === grouped4.partitioner) assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner) assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner) + assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner) assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner) assert(grouped2.join(reduced2).partitioner === grouped2.partitioner) assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner) assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner) + assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner) assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner) assert(grouped2.map(_ => 1).partitioner === None) @@ -127,6 +129,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext { assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array")) + assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array")) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 57d3382ed0b3f..e3d7c955295fa 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -165,6 +165,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { )) } + test("fullOuterJoin") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.fullOuterJoin(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (Some(1), Some('x'))), + (1, (Some(2), Some('x'))), + (2, (Some(1), Some('y'))), + (2, (Some(1), Some('z'))), + (3, (Some(1), None)), + (4, (None, Some('w'))) + )) + } + test("join with no matches") { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 673be9101f568..8df36e1dc85d8 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -135,6 +135,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(rdd.join(emptyKv).collect().size === 0) assert(rdd.rightOuterJoin(emptyKv).collect().size === 0) assert(rdd.leftOuterJoin(emptyKv).collect().size === 2) + assert(rdd.fullOuterJoin(emptyKv).collect().size === 2) assert(rdd.cogroup(emptyKv).collect().size === 2) assert(rdd.union(emptyKv).collect().size === 2) } diff --git a/python/pyspark/join.py b/python/pyspark/join.py index 5f4294fb1b777..96992db240a11 100644 --- a/python/pyspark/join.py +++ b/python/pyspark/join.py @@ -78,6 +78,22 @@ def dispatch(seq): return _do_python_join(rdd, other, numPartitions, dispatch) +def python_full_outer_join(rdd, other, numPartitions): + def dispatch(seq): + vbuf, wbuf = [], [] + for (n, v) in seq: + if n == 1: + vbuf.append(v) + elif n == 2: + wbuf.append(v) + if not vbuf: + vbuf.append(None) + if not wbuf: + wbuf.append(None) + return [(v, w) for v in vbuf for w in wbuf] + return _do_python_join(rdd, other, numPartitions, dispatch) + + def python_cogroup(rdd, other, numPartitions): vs = rdd.map(lambda (k, v): (k, (1, v))) ws = other.map(lambda (k, v): (k, (2, v))) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3f84312db435f..5f6d39a1e7957 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -728,6 +728,23 @@ def rightOuterJoin(self, other, numPartitions=None): """ return python_right_outer_join(self, other, numPartitions) + def fullOuterJoin(self, other, numPartitions=None): + """ + Perform a full outer join of C{self} and C{other}. + + Output will have each row from both RDDs or None where missing, i.e. + one of (k, (v, w)), (k, (v, None)), or (k, (None, w)) depending on + the presence of (k, v) and/or (k, w) in C{self} and C{other} + + Hash-partitions the resulting RDD into the given number of partitions. + + >>> x = sc.parallelize([("a", 1), ("b", 4)]) + >>> y = sc.parallelize([("a", 2), ("c", 3)]) + >>> sorted(y.fullOuterJoin(x).collect()) + [('a', (2, 1)), ('b', (None, 4)), ('c', (3, None))] + """ + return python_full_outer_join(self, other, numPartitions) + # TODO: add option to control map-side combining def partitionBy(self, numPartitions, partitionFunc=hash): """ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index 8c12fd11efcaf..26820778a06bc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -549,6 +549,42 @@ extends Serializable { ) } + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def fullOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = { + fullOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def fullOuterJoin[W: ClassManifest]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (Option[V], Option[W]))] = { + fullOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def fullOuterJoin[W: ClassManifest]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (Option[V], Option[W]))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner) + ) + } + /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index c6cd635afa0c8..9975a3cb72eef 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -647,6 +647,48 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def fullOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], Optional[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.fullOuterJoin(other.dstream) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))} + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def fullOuterJoin[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (Optional[V], Optional[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.fullOuterJoin(other.dstream, numPartitions) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))} + } + + /** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def fullOuterJoin[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (Optional[V], Optional[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.fullOuterJoin(other.dstream, partitioner) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))} + } + /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 259ef1608cbc5..650d83261ec76 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -310,6 +310,21 @@ class BasicOperationsSuite extends TestSuiteBase { testOperation(inputData1, inputData2, operation, outputData, true) } + test("fullOuterJoin") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (Some(1), Some("x"))), ("b", (Some(1), Some("x"))) ), + Seq( ("", (Some(1), Some("x"))), ("b", (None, Some("x"))), ("a", (Some(1), None)) ), + Seq( ("", (Some(1), None)) ), + Seq( ("", (None, Some("x"))) ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x, 1)).fullOuterJoin(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + test("updateStateByKey") { val inputData = Seq(