diff --git a/build.sbt b/build.sbt index 9fba287e27..bd1a7422fe 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ import AssemblyKeys._ name := "scalding" -version := "0.3.2" +version := "0.3.3" organization := "com.twitter" @@ -10,15 +10,15 @@ scalaVersion := "2.8.1" resolvers += "Concurrent Maven Repo" at "http://conjars.org/repo" -libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-227" +libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-236" -libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-227" +libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-236" -libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-227" +libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-236" libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.2.1" -libraryDependencies += "com.twitter" % "meat-locker" % "0.1.5" +libraryDependencies += "com.twitter" % "meat-locker" % "0.1.6" libraryDependencies += "commons-lang" % "commons-lang" % "2.4" diff --git a/scripts/scald.rb b/scripts/scald.rb index b2d2fd6c3e..34dea5ba1e 100755 --- a/scripts/scald.rb +++ b/scripts/scald.rb @@ -2,7 +2,7 @@ require 'fileutils' require 'thread' -SCALDING_VERSION="0.3.2" +SCALDING_VERSION="0.3.3" #Usage : scald.rb [--hdfs|--local|--print] job # --hdfs: if job ends in ".scala" or ".java" and the file exists, link it against JARFILE (below) and then run it on HOST. diff --git a/src/main/scala/com/twitter/scalding/Source.scala b/src/main/scala/com/twitter/scalding/Source.scala index 1ba5e9262f..19722694ca 100644 --- a/src/main/scala/com/twitter/scalding/Source.scala +++ b/src/main/scala/com/twitter/scalding/Source.scala @@ -151,7 +151,7 @@ abstract class Source extends java.io.Serializable { } protected def createHadoopTestReadTap(buffer : Iterable[Tuple]) : - Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = { + Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = { new MemorySourceTap(buffer.toList.asJava, hdfsScheme.getSourceFields()) } @@ -205,7 +205,7 @@ abstract class Source extends java.io.Serializable { } protected def createHdfsReadTap(hdfsMode : Hdfs) : - Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = { + Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = { val goodPaths = if (hdfsMode.sourceStrictness) { //we check later that all the paths are good hdfsPaths @@ -223,12 +223,11 @@ abstract class Source extends java.io.Serializable { new Hfs(hdfsScheme, hdfsPaths.head, SinkMode.KEEP) } case 1 => taps.head - case _ => new MultiSourceTap[HadoopFlowProcess, - JobConf, RecordReader[_,_], OutputCollector[_,_]](taps.toSeq : _*) + case _ => new MultiSourceTap[Hfs, HadoopFlowProcess, JobConf, RecordReader[_,_]]( taps.toSeq : _*) } } protected def createHdfsWriteTap(hdfsMode : Hdfs) : - Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = { + Tap[HadoopFlowProcess, JobConf, _, OutputCollector[_,_]] = { new Hfs(hdfsScheme, hdfsWritePath, SinkMode.REPLACE) } diff --git a/src/test/scala/com/twitter/scalding/CoreTest.scala b/src/test/scala/com/twitter/scalding/CoreTest.scala index 66a8bb208c..6497daf728 100644 --- a/src/test/scala/com/twitter/scalding/CoreTest.scala +++ b/src/test/scala/com/twitter/scalding/CoreTest.scala @@ -133,6 +133,7 @@ class TinyJoinJob(args: Args) extends Job(args) { } class TinyJoinTest extends Specification with TupleConversions { + noDetailedDiffs() //Fixes an issue with scala 2.9 "A JoinJob" should { val input1 = List("a" -> 1, "b" -> 2, "c" -> 3) val input2 = List("b" -> -1, "c" -> 5, "d" -> 4) @@ -159,6 +160,43 @@ class TinyJoinTest extends Specification with TupleConversions { } } +class TinyThenSmallJoin(args : Args) extends Job(args) { + val pipe0 = Tsv("in0",('x0,'y0)).read + val pipe1 = Tsv("in1",('x1,'y1)).read + val pipe2 = Tsv("in2",('x2,'y2)).read + + pipe0.joinWithTiny('x0 -> 'x1, pipe1) + .joinWithSmaller('x0 -> 'x2, pipe2) + .write(Tsv("out")) +} + +class TinyThenSmallJoinTest extends Specification with TupleConversions with FieldConversions { + noDetailedDiffs() //Fixes an issue with scala 2.9 + "A TinyThenSmallJoin" should { + val input0 = List((1,2),(2,3),(3,4)) + val input1 = List((1,20),(2,30),(3,40)) + val input2 = List((1,200),(2,300),(3,400)) + val correct = List((1,2,1,20,1,200), + (2,3,2,30,2,300),(3,4,3,40,3,400)) + + JobTest("com.twitter.scalding.TinyThenSmallJoin") + .source(Tsv("in0",('x0,'y0)), input0) + .source(Tsv("in1",('x1,'y1)), input1) + .source(Tsv("in2",('x2,'y2)), input2) + .sink[(Int,Int,Int,Int,Int,Int)](Tsv("out")) { outBuf => + val actualOutput = outBuf.toList.sorted + println(actualOutput) + "join tuples with the same key" in { + correct must be_==(actualOutput) + } + } + .run + .runHadoop + .finish + } +} + + class MergeTestJob(args : Args) extends Job(args) { val in = TextLine(args("in")).read.mapTo(1->('x,'y)) { line : String => val p = line.split(" ").map { _.toDouble }