Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Adding documentation and API cleanups.

  • Loading branch information...
commit 1db425b33de28bcf912df596f1957a7f60fb0a60 1 parent 7a2a585
Argyris Zymnis authored
2  src/main/scala/com/twitter/scalding/GroupBuilder.scala
View
@@ -27,8 +27,6 @@ import scala.collection.JavaConverters._
import scala.annotation.tailrec
import scala.math.Ordering
-import java.lang.IllegalArgumentException
-
// This controls the sequence of reductions that happen inside a
// particular grouping operation. Not all elements can be combined,
// for instance, a scanLeft/foldLeft generally requires a sorting
55 src/main/scala/com/twitter/scalding/JoinAlgorithms.scala
View
@@ -19,7 +19,7 @@ import cascading.tap._
import cascading.scheme._
import cascading.pipe._
import cascading.pipe.assembly._
-import cascading.pipe.joiner.{InnerJoin => CInnerJoin, LeftJoin => CLeftJoin}
+import cascading.pipe.joiner._
import cascading.flow._
import cascading.operation._
import cascading.operation.aggregator._
@@ -38,6 +38,16 @@ trait JoinAlgorithms {
def pipe : Pipe
+ /**
+ * This method is used internally to implement all joins.
+ * You can use this directly if you want to implement something like a star join,
+ * e.g., when joining a single pipe to multiple other pipes. Make sure that you call this method
+ * on the larger pipe to make the grouping as efficient as possible.
+ *
+ * If you are only joining two pipes, then you are better off
+ * using joinWithSmaller/joinWithLarger/joinWithTiny/leftJoinWithTiny.
+ *
+ */
def coGroupBy(f : Fields, j : JoinMode = InnerJoinMode)(builder : CoGroupBuilder => GroupBuilder) : Pipe = {
builder(new CoGroupBuilder(f, j)).schedule(pipe.getName, pipe)
}
@@ -85,6 +95,16 @@ trait JoinAlgorithms {
(renamedPipe, newJoinKeys, temp)
}
+ def joinerToJoinModes(j : Joiner) = {
+ j match {
+ case i : InnerJoin => (InnerJoinMode, InnerJoinMode)
+ case l : LeftJoin => (InnerJoinMode, OuterJoinMode)
+ case r : RightJoin => (OuterJoinMode, InnerJoinMode)
+ case o : OuterJoin => (OuterJoinMode, OuterJoinMode)
+ case _ => throw new InvalidJoinModeException("cannot convert joiner to joiner modes")
+ }
+ }
+
/**
* joins the first set of keys in the first pipe to the second set of keys in the second pipe.
* All keys must be unique UNLESS it is an inner join, then duplicated join keys are allowed, but
@@ -96,8 +116,9 @@ trait JoinAlgorithms {
* joinWithSmaller(('other1, 'other2)->('this1, 'this2), pipe, new FancyJoin)
* }
*/
- def joinWithSmaller(fs :(Fields,Fields), that : Pipe, joiners : (JoinMode, JoinMode) = (InnerJoinMode, InnerJoinMode), reducers : Int = -1) = {
+ def joinWithSmaller(fs :(Fields,Fields), that : Pipe, joiner : Joiner = new InnerJoin, reducers : Int = -1) = {
// If we are not doing an inner join, the join fields must be disjoint:
+ val joiners = joinerToJoinModes(joiner)
val intersection = asSet(fs._1).intersect(asSet(fs._2))
if (intersection.size == 0) {
// Common case: no intersection in names: just CoGroup, which duplicates the grouping fields:
@@ -124,17 +145,17 @@ trait JoinAlgorithms {
}
}
- def joinWithLarger(fs : (Fields, Fields), that : Pipe, joiners : (JoinMode, JoinMode) = (InnerJoinMode,InnerJoinMode), reducers : Int = -1) = {
- that.joinWithSmaller((fs._2, fs._1), pipe, joiners, reducers)
+ def joinWithLarger(fs : (Fields, Fields), that : Pipe, joiner : Joiner = new InnerJoin, reducers : Int = -1) = {
+ that.joinWithSmaller((fs._2, fs._1), pipe, joiner, reducers)
}
def leftJoinWithSmaller(fs :(Fields,Fields), that : Pipe, reducers : Int = -1) = {
- joinWithSmaller(fs, that, (InnerJoinMode, OuterJoinMode), reducers)
+ joinWithSmaller(fs, that, new LeftJoin, reducers)
}
def leftJoinWithLarger(fs :(Fields,Fields), that : Pipe, reducers : Int = -1) = {
//We swap the order, and turn left into right:
- that.joinWithSmaller((fs._2, fs._1), pipe, (OuterJoinMode, InnerJoinMode), reducers)
+ that.joinWithSmaller((fs._2, fs._1), pipe, new RightJoin, reducers)
}
/**
@@ -151,18 +172,18 @@ trait JoinAlgorithms {
def joinWithTiny(fs :(Fields,Fields), that : Pipe) = {
val intersection = asSet(fs._1).intersect(asSet(fs._2))
if (intersection.size == 0) {
- new Join(assignName(pipe), fs._1, assignName(that), fs._2, new CInnerJoin)
+ new Join(assignName(pipe), fs._1, assignName(that), fs._2, new InnerJoin)
}
else {
val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection)
- (new Join(assignName(pipe), fs._1, assignName(renamedThat), newJoinFields, new CInnerJoin))
+ (new Join(assignName(pipe), fs._1, assignName(renamedThat), newJoinFields, new InnerJoin))
.discard(temp)
}
}
def leftJoinWithTiny(fs :(Fields,Fields), that : Pipe) = {
//Rename these pipes to avoid cascading name conflicts
- new Join(assignName(pipe), fs._1, assignName(that), fs._2, new CLeftJoin)
+ new Join(assignName(pipe), fs._1, assignName(that), fs._2, new LeftJoin)
}
/*
@@ -193,11 +214,11 @@ trait JoinAlgorithms {
*/
def blockJoinWithSmaller(fs : (Fields, Fields),
otherPipe : Pipe, rightReplication : Int = 1, leftReplication : Int = 1,
- joiners : (JoinMode, JoinMode) = (InnerJoinMode, InnerJoinMode), reducers : Int = -1) : Pipe = {
+ joiner : Joiner = new InnerJoin, reducers : Int = -1) : Pipe = {
assert(rightReplication > 0, "Must specify a positive number for the right replication in block join")
assert(leftReplication > 0, "Must specify a positive number for the left replication in block join")
- assertValidJoinMode(joiners, leftReplication, rightReplication)
+ assertValidJoinMode(joiner, leftReplication, rightReplication)
// These are the new dummy fields used in the skew join
val leftFields = new Fields("__LEFT_I__", "__LEFT_J__")
@@ -211,7 +232,7 @@ trait JoinAlgorithms {
val rightJoinFields = Fields.join(fs._2, rightFields)
newLeft
- .joinWithSmaller((leftJoinFields, rightJoinFields), newRight, joiners, reducers)
+ .joinWithSmaller((leftJoinFields, rightJoinFields), newRight, joiner, reducers)
.discard(leftFields)
.discard(rightFields)
}
@@ -228,11 +249,11 @@ trait JoinAlgorithms {
}
}
- private def assertValidJoinMode(joiners : (JoinMode, JoinMode), left : Int, right : Int) {
- (joiners, left, right) match {
- case ((InnerJoinMode, InnerJoinMode), _, _) => true
- case ((InnerJoinMode, OuterJoinMode), 1, _) => true
- case ((OuterJoinMode, InnerJoinMode), _, 1) => true
+ private def assertValidJoinMode(joiner : Joiner, left : Int, right : Int) {
+ (joiner, left, right) match {
+ case (i : InnerJoin, _, _) => true
+ case (k : LeftJoin, 1, _) => true
+ case (m : RightJoin, _, 1) => true
case (j, l, r) =>
throw new InvalidJoinModeException(
"you cannot use joiner " + j + " with left replication " + l + " and right replication " + r
10 src/test/scala/com/twitter/scalding/BlockJoinTest.scala
View
@@ -12,16 +12,16 @@ class InnerProductJob(args : Args) extends Job(args) {
val l = args.getOrElse("left", "1").toInt
val r = args.getOrElse("right", "1").toInt
val j = args.getOrElse("joiner", "i") match {
- case "i" => (InnerJoinMode, InnerJoinMode)
- case "l" => (InnerJoinMode, OuterJoinMode)
- case "r" => (OuterJoinMode, InnerJoinMode)
- case "o" => (OuterJoinMode, OuterJoinMode)
+ case "i" => new InnerJoin
+ case "l" => new LeftJoin
+ case "r" => new RightJoin
+ case "o" => new OuterJoin
}
val in0 = Tsv("input0").read.mapTo((0,1,2) -> ('x1, 'y1, 's1)) { input : (Int, Int, Int) => input }
val in1 = Tsv("input1").read.mapTo((0,1,2) -> ('x2, 'y2, 's2)) { input : (Int, Int, Int) => input }
in0
- .blockJoinWithSmaller('y1 -> 'y2, in1, leftReplication = l, rightReplication = r, joiners = j)
+ .blockJoinWithSmaller('y1 -> 'y2, in1, leftReplication = l, rightReplication = r, joiner = j)
.map(('s1, 's2) -> 'score) { v : (Int, Int) =>
v._1 * v._2
}
Please sign in to comment.
Something went wrong with that request. Please try again.