Skip to content

Commit

Permalink
Merge pull request twitter#17 from johnynek/master
Browse files Browse the repository at this point in the history
Bumps version, fixes issue with joinWithTiny along with joinWithSmaller
  • Loading branch information
azymnis committed Feb 26, 2012
2 parents 3f4430e + 311e5a5 commit 4990bae
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 11 deletions.
10 changes: 5 additions & 5 deletions build.sbt
Expand Up @@ -2,23 +2,23 @@ import AssemblyKeys._

name := "scalding"

version := "0.3.2"
version := "0.3.3"

organization := "com.twitter"

scalaVersion := "2.8.1"

resolvers += "Concurrent Maven Repo" at "http://conjars.org/repo"

libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-227"
libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-236"

libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-227"
libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-236"

libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-227"
libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-236"

libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.2.1"

libraryDependencies += "com.twitter" % "meat-locker" % "0.1.5"
libraryDependencies += "com.twitter" % "meat-locker" % "0.1.6"

libraryDependencies += "commons-lang" % "commons-lang" % "2.4"

Expand Down
2 changes: 1 addition & 1 deletion scripts/scald.rb
Expand Up @@ -2,7 +2,7 @@
require 'fileutils'
require 'thread'

SCALDING_VERSION="0.3.2"
SCALDING_VERSION="0.3.3"

#Usage : scald.rb [--hdfs|--local|--print] job <job args>
# --hdfs: if job ends in ".scala" or ".java" and the file exists, link it against JARFILE (below) and then run it on HOST.
Expand Down
9 changes: 4 additions & 5 deletions src/main/scala/com/twitter/scalding/Source.scala
Expand Up @@ -151,7 +151,7 @@ abstract class Source extends java.io.Serializable {
}

protected def createHadoopTestReadTap(buffer : Iterable[Tuple]) :
Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
new MemorySourceTap(buffer.toList.asJava, hdfsScheme.getSourceFields())
}

Expand Down Expand Up @@ -205,7 +205,7 @@ abstract class Source extends java.io.Serializable {
}

protected def createHdfsReadTap(hdfsMode : Hdfs) :
Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
val goodPaths = if (hdfsMode.sourceStrictness) {
//we check later that all the paths are good
hdfsPaths
Expand All @@ -223,12 +223,11 @@ abstract class Source extends java.io.Serializable {
new Hfs(hdfsScheme, hdfsPaths.head, SinkMode.KEEP)
}
case 1 => taps.head
case _ => new MultiSourceTap[HadoopFlowProcess,
JobConf, RecordReader[_,_], OutputCollector[_,_]](taps.toSeq : _*)
case _ => new MultiSourceTap[Hfs, HadoopFlowProcess, JobConf, RecordReader[_,_]]( taps.toSeq : _*)
}
}
protected def createHdfsWriteTap(hdfsMode : Hdfs) :
Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
Tap[HadoopFlowProcess, JobConf, _, OutputCollector[_,_]] = {
new Hfs(hdfsScheme, hdfsWritePath, SinkMode.REPLACE)
}

Expand Down
38 changes: 38 additions & 0 deletions src/test/scala/com/twitter/scalding/CoreTest.scala
Expand Up @@ -133,6 +133,7 @@ class TinyJoinJob(args: Args) extends Job(args) {
}

class TinyJoinTest extends Specification with TupleConversions {
noDetailedDiffs() //Fixes an issue with scala 2.9
"A JoinJob" should {
val input1 = List("a" -> 1, "b" -> 2, "c" -> 3)
val input2 = List("b" -> -1, "c" -> 5, "d" -> 4)
Expand All @@ -159,6 +160,43 @@ class TinyJoinTest extends Specification with TupleConversions {
}
}

class TinyThenSmallJoin(args : Args) extends Job(args) {
val pipe0 = Tsv("in0",('x0,'y0)).read
val pipe1 = Tsv("in1",('x1,'y1)).read
val pipe2 = Tsv("in2",('x2,'y2)).read

pipe0.joinWithTiny('x0 -> 'x1, pipe1)
.joinWithSmaller('x0 -> 'x2, pipe2)
.write(Tsv("out"))
}

class TinyThenSmallJoinTest extends Specification with TupleConversions with FieldConversions {
noDetailedDiffs() //Fixes an issue with scala 2.9
"A TinyThenSmallJoin" should {
val input0 = List((1,2),(2,3),(3,4))
val input1 = List((1,20),(2,30),(3,40))
val input2 = List((1,200),(2,300),(3,400))
val correct = List((1,2,1,20,1,200),
(2,3,2,30,2,300),(3,4,3,40,3,400))

JobTest("com.twitter.scalding.TinyThenSmallJoin")
.source(Tsv("in0",('x0,'y0)), input0)
.source(Tsv("in1",('x1,'y1)), input1)
.source(Tsv("in2",('x2,'y2)), input2)
.sink[(Int,Int,Int,Int,Int,Int)](Tsv("out")) { outBuf =>
val actualOutput = outBuf.toList.sorted
println(actualOutput)
"join tuples with the same key" in {
correct must be_==(actualOutput)
}
}
.run
.runHadoop
.finish
}
}


class MergeTestJob(args : Args) extends Job(args) {
val in = TextLine(args("in")).read.mapTo(1->('x,'y)) { line : String =>
val p = line.split(" ").map { _.toDouble }
Expand Down

0 comments on commit 4990bae

Please sign in to comment.