Skip to content

Commit

Permalink
Merge pull request #12 from markhamstra/FullOuterJoin
Browse files Browse the repository at this point in the history
SKIPME Full outer join
  • Loading branch information
jhartlaub committed May 31, 2014
2 parents 923c5ba + 3e566bc commit af198bf
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 0 deletions.
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,19 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
}

/**
* Perform a full outer join of `this` and `other`. Output will have
* each row from both RDDs or `None` where missing, i.e. one of
* (k, (Some(v), Some(w)), (k, (Some(v), None)) or (k, (None, Some(w))
* depending on the presence of (k, v) and/or (k, w) in `this` and `other`
* Uses the given Partitioner to partition the output RDD.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))})
}

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
*/
Expand Down Expand Up @@ -402,6 +415,24 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}

/**
* Perform a full outer join of `this` and `other`. Hash-partitions the resulting
* RDD using the existing partitioner/parallelism level.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))})
}

/**
* Perform a full outer join of `this` and `other`. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))})
}

/**
* Return the key-value pairs in this RDD to the master as a Map.
*/
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,26 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
}

/**
* Perform a full outer join of `this` and `other`. Output will have
* each row from both RDDs or `None` where missing, i.e. one of
* (k, (Some(v), Some(w)), (k, (Some(v), None)) or (k, (None, Some(w))
* depending on the presence of (k, v) and/or (k, w) in `this` and `other`
* Uses the given Partitioner to partition the output RDD.
*/
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (vs.isEmpty && !ws.isEmpty) {
ws.iterator.map(w => (None, Some(w)))
} else if (ws.isEmpty && !vs.isEmpty) {
vs.iterator.map(v => (Some(v), None))
} else {
for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
}
}
}

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
Expand Down Expand Up @@ -422,6 +442,22 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
rightOuterJoin(other, new HashPartitioner(numPartitions))
}

/**
* Perform a full outer join of `this` and `other`. Hash-partitions the resulting
* RDD using the existing partitioner/parallelism level.
*/
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
fullOuterJoin(other, defaultPartitioner(self, other))
}

/**
* Perform a full outer join of `this` and `other`. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
fullOuterJoin(other, new HashPartitioner(numPartitions))
}

/**
* Return the key-value pairs in this RDD to the master as a Map.
*/
Expand Down
3 changes: 3 additions & 0 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext {
assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)

assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)

assert(grouped2.map(_ => 1).partitioner === None)
Expand All @@ -127,6 +129,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext {
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
))
}

test("fullOuterJoin") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.fullOuterJoin(rdd2).collect()
assert(joined.size === 6)
assert(joined.toSet === Set(
(1, (Some(1), Some('x'))),
(1, (Some(2), Some('x'))),
(2, (Some(1), Some('y'))),
(2, (Some(1), Some('z'))),
(3, (Some(1), None)),
(4, (None, Some('w')))
))
}

test("join with no matches") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(rdd.join(emptyKv).collect().size === 0)
assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
assert(rdd.fullOuterJoin(emptyKv).collect().size === 2)
assert(rdd.cogroup(emptyKv).collect().size === 2)
assert(rdd.union(emptyKv).collect().size === 2)
}
Expand Down
16 changes: 16 additions & 0 deletions python/pyspark/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ def dispatch(seq):
return _do_python_join(rdd, other, numPartitions, dispatch)


def python_full_outer_join(rdd, other, numPartitions):
def dispatch(seq):
vbuf, wbuf = [], []
for (n, v) in seq:
if n == 1:
vbuf.append(v)
elif n == 2:
wbuf.append(v)
if not vbuf:
vbuf.append(None)
if not wbuf:
wbuf.append(None)
return [(v, w) for v in vbuf for w in wbuf]
return _do_python_join(rdd, other, numPartitions, dispatch)


def python_cogroup(rdd, other, numPartitions):
vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))
Expand Down
17 changes: 17 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,23 @@ def rightOuterJoin(self, other, numPartitions=None):
"""
return python_right_outer_join(self, other, numPartitions)

def fullOuterJoin(self, other, numPartitions=None):
"""
Perform a full outer join of C{self} and C{other}.
Output will have each row from both RDDs or None where missing, i.e.
one of (k, (v, w)), (k, (v, None)), or (k, (None, w)) depending on
the presence of (k, v) and/or (k, w) in C{self} and C{other}
Hash-partitions the resulting RDD into the given number of partitions.
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2), ("c", 3)])
>>> sorted(y.fullOuterJoin(x).collect())
[('a', (2, 1)), ('b', (None, 4)), ('c', (3, None))]
"""
return python_full_outer_join(self, other, numPartitions)

# TODO: add option to control map-side combining
def partitionBy(self, numPartitions, partitionFunc=hash):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,48 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
def fullOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], Optional[W])] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
val joinResult = dstream.fullOuterJoin(other.dstream)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
* partitions.
*/
def fullOuterJoin[W](
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (Optional[V], Optional[W])] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
val joinResult = dstream.fullOuterJoin(other.dstream, numPartitions)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
* the partitioning of each RDD.
*/
def fullOuterJoin[W](
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (Optional[V], Optional[W])] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
val joinResult = dstream.fullOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}
}

/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,42 @@ extends Serializable {
)
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = {
fullOuterJoin[W](other, defaultPartitioner())
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
* partitions.
*/
def fullOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
): DStream[(K, (Option[V], Option[W]))] = {
fullOuterJoin[W](other, defaultPartitioner(numPartitions))
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
* the partitioning of each RDD.
*/
def fullOuterJoin[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (Option[V], Option[W]))] = {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner)
)
}

/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
* is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,21 @@ class BasicOperationsSuite extends TestSuiteBase {
testOperation(inputData1, inputData2, operation, outputData, true)
}

test("fullOuterJoin") {
val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
val outputData = Seq(
Seq( ("a", (Some(1), Some("x"))), ("b", (Some(1), Some("x"))) ),
Seq( ("", (Some(1), Some("x"))), ("b", (None, Some("x"))), ("a", (Some(1), None)) ),
Seq( ("", (Some(1), None)) ),
Seq( ("", (None, Some("x"))) )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
s1.map(x => (x, 1)).fullOuterJoin(s2.map(x => (x, "x")))
}
testOperation(inputData1, inputData2, operation, outputData, true)
}

test("updateStateByKey") {
val inputData =
Seq(
Expand Down

0 comments on commit af198bf

Please sign in to comment.