diff --git a/README.md b/README.md index cf658c884..39a2dd9d7 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ To use the sbt-scoobi plugin we need to include a `project/project/scoobi.scala` } ``` -And, we can add a pretty standard `build.sbt` that has a dependency on Scoobi: +And, we can add a `build.sbt` that has a dependency on Scoobi: ```scala name := "MyApp" @@ -38,7 +38,9 @@ And, we can add a pretty standard `build.sbt` that has a dependency on Scoobi: libraryDependencies += "com.nicta" %% "scoobi" % "0.4.0-SNAPSHOT" % "provided" - scalacOptions += "-deprecation" + scalacOptions ++= Seq("-Ydependent-method-types", "-deprecation") + + resolvers += "snapshots" at "http://oss.sonatype.org/content/repositories/snapshots" ``` The `provided` is added to the `scoobi` dependency to let sbt know that Scoobi diff --git a/examples/averageAge/build.sbt b/examples/averageAge/build.sbt deleted file mode 100644 index 5244f7ff4..000000000 --- a/examples/averageAge/build.sbt +++ /dev/null @@ -1,7 +0,0 @@ -name := "Average Age Calculator" - -version := "0.1" - -scalaVersion := "2.9.2" - -libraryDependencies += "com.nicta" %% "scoobi" % "0.4.0-SNAPSHOT" % "provided" diff --git a/examples/averageAge/project/project/build.scala b/examples/averageAge/project/project/build.scala deleted file mode 100644 index c5aaae043..000000000 --- a/examples/averageAge/project/project/build.scala +++ /dev/null @@ -1,7 +0,0 @@ -import sbt._ - -object Plugins extends Build { - lazy val root = Project("root", file(".")) dependsOn( - uri("git://github.com/NICTA/sbt-scoobi.git#master") - ) -} diff --git a/examples/averageAge/src/main/scala/AverageAge.scala b/examples/averageAge/src/main/scala/AverageAge.scala deleted file mode 100644 index b3ca4304d..000000000 --- a/examples/averageAge/src/main/scala/AverageAge.scala +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Copyright 2011 National ICT Australia Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.nicta.scoobi.examples - -import com.nicta.scoobi.Scoobi._ -import java.io._ - -/* - * This example takes a list of names and ages in the form: , , , - * then gets the average age for each first name. - */ - -object AverageAge extends ScoobiApp { - - if (!new File("output-dir").mkdir) { - sys.error("Could not make output-dir for results. Perhaps it already exists (and you should delete/rename the old one)") - } - - val fileName = "output-dir/names.txt" - - // write some names to a file (so this example has no external requirements) - generateNames(fileName) - - case class Person(val id: Long, - val secondName: String, - val firstName: String, - val age: Int) - - // With this implicit conversion, we let Scoobi know the apply and unapply function, which it uses - // to construct and deconstruct Person objects. Now it can very efficiently serialize them (i.e. no overhead) - implicit val PersonFmt = mkCaseWireFormat(Person, Person.unapply _) - - - // Read in lines of the form: 234242, Bob, Smith, 31. - val persons : DList[Person] = fromDelimitedTextFile(fileName, ",") { - case ALong(id) :: fN :: sN :: AnInt(age) :: _ => Person(id, sN, fN, age) - } - - // The only thing we're interested in, is the firstName and age - val nameAndAge: DList[(String, Int)] = persons.map { p => (p.firstName, p.age) } - - // Let's group everyone with the same name together - val grouped: DList[(String, Iterable[Int])] = nameAndAge groupByKey - - // And for every name, we will average all the avages - val avgAgeForName: DList[(String, Int)] = grouped map { case (n, ages) => (n, average(ages)) } - - // Execute everything, and throw it into a directory - DList.persist (toTextFile(avgAgeForName, "output-dir/avg-age")) - - private def average[A](values: Iterable[A])(implicit ev: Numeric[A]) = { - import ev._ - - var value: Int = 0 - var count = 0 - - for (i <- values) { - value = value + toInt(i) - count = count + 1 - } - - value / count - } - - private def generateNames(filename: String) { - val fstream = new FileWriter(filename) - - fstream write ("""100,Ben,Lever,31 -101,Tom,Smith,45 -102,Michael,Robson,33 -103,Rami,Mukhatar,34 -104,Sean,Seefried,33 -105,Ben,Cool,27 -106,Tom,Selleck,66 -107,Michael,Jordan,48 -108,Rami,Yacoub,36 -109,Sean,Connery,81""") - - fstream close() - } -} diff --git a/examples/javaWordCount/build.sbt b/examples/javaWordCount/build.sbt index 80c741b1c..807c88d8f 100644 --- a/examples/javaWordCount/build.sbt +++ b/examples/javaWordCount/build.sbt @@ -1,8 +1,10 @@ name := "Java Word Count" -version := "0.1" +version := "1.0" scalaVersion := "2.9.2" libraryDependencies += "com.nicta" %% "scoobi" % "0.4.0-SNAPSHOT" % "provided" +resolvers += "snapshots" at "http://oss.sonatype.org/content/repositories/snapshots" + diff --git a/examples/javaWordCount/src/main/java/WordCount.java b/examples/javaWordCount/src/main/java/WordCount.java index aeee071ba..2ae324a39 100644 --- a/examples/javaWordCount/src/main/java/WordCount.java +++ b/examples/javaWordCount/src/main/java/WordCount.java @@ -14,26 +14,12 @@ * limitations under the License. */ package com.nicta.scoobij.examples; -/** - * Copyright 2011 National ICT Australia Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ import com.nicta.scoobij.*; import java.io.*; import com.nicta.scoobij.io.text.*; + public class WordCount { public static void main(String[] args) throws java.io.IOException { @@ -88,7 +74,7 @@ public Integer apply(Integer a, Integer b) { }, WireFormats.string(), WireFormats.integer()); // We can evalute this, and write it to a text file - Scoobi.persist(TextOutput.toTextFile(reduced, outputPath, + Scoobi.persist(TextOutput.toTextFile(reduced, outputPath, false, WireFormats.string(), WireFormats.integer())); } diff --git a/examples/joinExamples/build.sbt b/examples/joinExamples/build.sbt deleted file mode 100644 index adbbaf82b..000000000 --- a/examples/joinExamples/build.sbt +++ /dev/null @@ -1,7 +0,0 @@ -name := "Join Examples" - -version := "0.1" - -scalaVersion := "2.9.2" - -libraryDependencies += "com.nicta" %% "scoobi" % "0.4.0-SNAPSHOT" % "provided" diff --git a/examples/joinExamples/project/project/build.scala b/examples/joinExamples/project/project/build.scala deleted file mode 100644 index c5aaae043..000000000 --- a/examples/joinExamples/project/project/build.scala +++ /dev/null @@ -1,7 +0,0 @@ -import sbt._ - -object Plugins extends Build { - lazy val root = Project("root", file(".")) dependsOn( - uri("git://github.com/NICTA/sbt-scoobi.git#master") - ) -} diff --git a/examples/joinExamples/src/main/scala/JoinExamples.scala b/examples/joinExamples/src/main/scala/JoinExamples.scala deleted file mode 100644 index b11a6df6b..000000000 --- a/examples/joinExamples/src/main/scala/JoinExamples.scala +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Copyright 2011 National ICT Australia Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.nicta.scoobi.examples - -import com.nicta.scoobi.Scoobi._ -import java.io._ - -/* - * This example will show you how to apply different types of joins using the sample - * dataset from the WikiPedia page on SQL Joins (http://en.wikipedia.org/wiki/Join_(SQL)), - * slighly adjusted to be more OO/Scala. - */ - -object JoinExamples { - def main(args: Array[String]) = withHadoopArgs(args) { _ => - - if (!new File("output-dir").mkdir) { - sys.error("Could not make output-dir for results. Perhaps it already exists (and you should delete/rename the old one)") - } - - val employeesFile = "output-dir/employees.txt" - val departmentsFile = "output-dir/departments.txt" - - // write some names to a file (so this example has no external requirements) - generateDataSet(employeesFile, departmentsFile) - - case class Employee(val name: String, val departmentId: Long) - case class Department(val id: Long, val name: String) - - // With this implicit conversion, we let Scoobi know the apply and unapply function, which it uses - // to construct and deconstruct Employee and Department objects. - // Now it can very efficiently serialize them (i.e. no overhead) - implicit val EmployeeFmt = mkCaseWireFormat(Employee, Employee.unapply _) - implicit val DepartmentFmt = mkCaseWireFormat(Department, Department.unapply _) - - // Read in lines of the form: Bob Smith, 31 - val employees : DList[Employee] = fromDelimitedTextFile(employeesFile, ",") { - case name :: ALong(departmentId) :: _ => Employee(name, departmentId) - } - - // Read in lines of the form: 31, Finance - val departments : DList[Department] = fromDelimitedTextFile(departmentsFile, ",") { - case ALong(id) :: name :: _ => Department(id, name) - } - - val employeesByDepartmentId: DList[(Long, Employee)] = employees.by(_.departmentId) - val departmentsById: DList[(Long, Department)] = departments.by(_.id) - - // Perform an inner (equi)join - val inner: DList[(Long, (Employee, Department))] = join(employeesByDepartmentId, departmentsById) - - // Perform a left outer join and specify what to do when the left has an - // entry without a matching entry on the right - val left: DList[(Long, (Employee, Department))] = - joinLeft(employeesByDepartmentId, - departmentsById, - (departmentId, employee) => Department(departmentId, "Unknown")) - - // Perform a right outer join and specify what to do when the right has an - // entry without a matching entry on the left - val right: DList[(Long, (Employee, Department))] = - joinRight(employeesByDepartmentId, departmentsById, (id, department) => Employee("Unknown", id)) - - // Execute everything, and throw it into a directory - DList.persist( - toTextFile(inner, "output-dir/inner"), - toTextFile(left, "output-dir/left"), - toTextFile(right, "output-dir/right") - ) - } - - private def generateDataSet(employeesFile: String, departmentsFile: String) { - val e = new FileWriter(employeesFile) - val d = new FileWriter(departmentsFile) - - e.write("""Rafferty,31 -Jones,33 -Steinberg,33 -Robinson,34 -Smith,34 -John,-1""") - - e.close() - - d.write ("""31,Sales -33,Engineering -34,Clerical -35,Marketing""") - - d.close() - } -} diff --git a/examples/numberPartition/build.sbt b/examples/numberPartition/build.sbt deleted file mode 100644 index a9c8a98f1..000000000 --- a/examples/numberPartition/build.sbt +++ /dev/null @@ -1,7 +0,0 @@ -name := "Scoobi Number Filter" - -version := "0.1" - -scalaVersion := "2.9.2" - -libraryDependencies += "com.nicta" %% "scoobi" % "0.4.0-SNAPSHOT" % "provided" diff --git a/examples/numberPartition/project/project/build.scala b/examples/numberPartition/project/project/build.scala deleted file mode 100644 index aaf35c294..000000000 --- a/examples/numberPartition/project/project/build.scala +++ /dev/null @@ -1,8 +0,0 @@ -import sbt._ - -object Plugins extends Build { - lazy val root = Project("root", file(".")) dependsOn( - uri("git://github.com/NICTA/sbt-scoobi.git#master") - ) -} - diff --git a/examples/numberPartition/src/main/scala/NumberPartitioner.scala b/examples/numberPartition/src/main/scala/NumberPartitioner.scala deleted file mode 100644 index cfd2cf9b3..000000000 --- a/examples/numberPartition/src/main/scala/NumberPartitioner.scala +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright 2011 National ICT Australia Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.nicta.scoobi.examples - -import com.nicta.scoobi.Scoobi._ -import java.io._ - - -object NumberPartitioner extends ScoobiApp { - val fileName = "output-dir/all-ints.txt" - - // Write 50 (new line seperated) ints to a file. We do this to make the example self contained - generateInts(fileName, 50) - - // fromTextFile creates a list of Strings, where each String is a line - val data : DList[String] = fromTextFile(fileName); - - // since they're numbers, we can easily parse them - val intData : DList[Int] = data.map(_.toInt) - - // Now we can parition this data into two lists, one where they're even one where they're odd - val (evens, odds) = intData.partition(_ % 2 == 0) - - DList.persist ( - TextOutput.toTextFile(evens, "output-dir/evens"), - TextOutput.toTextFile(odds, "output-dir/odds") - ) - - private def generateInts(filename: String, count: Int) { - val fstream = new FileWriter(filename) - val r = new scala.util.Random() - (1 to count) foreach { _ => fstream write ( r.nextInt(count * 2).toString ++ "\n" ) } - fstream.close() - } -} diff --git a/examples/pageRank/build.sbt b/examples/pageRank/build.sbt index 64f930bfd..e064dab44 100644 --- a/examples/pageRank/build.sbt +++ b/examples/pageRank/build.sbt @@ -1,7 +1,11 @@ name := "PageRank" -version := "0.1" +version := "1.0" scalaVersion := "2.9.2" +scalacOptions ++= Seq("-Ydependent-method-types", "-deprecation") + libraryDependencies += "com.nicta" %% "scoobi" % "0.4.0-SNAPSHOT" % "provided" + +resolvers += "snapshots" at "http://oss.sonatype.org/content/repositories/snapshots" diff --git a/examples/pageRank/PageRank.scala b/examples/pageRank/src/main/scala/PageRank.scala similarity index 53% rename from examples/pageRank/PageRank.scala rename to examples/pageRank/src/main/scala/PageRank.scala index 6f4adde92..28e5ac26f 100644 --- a/examples/pageRank/PageRank.scala +++ b/examples/pageRank/src/main/scala/PageRank.scala @@ -20,7 +20,7 @@ import com.nicta.scoobi.Scoobi._ object PageRank extends ScoobiApp { /* Load raw graph from a file. */ - def loadGraph(path: String): DList[(Int, List[Int])] = { + private def loadGraph(path: String): DList[(Int, List[Int])] = { val Node = """^(\d+): (.*)$""".r val lines = fromTextFile(path) lines collect { case Node(n, rest) => (n.toInt, rest.split(" ").map(_.toInt).toList) } @@ -28,12 +28,12 @@ object PageRank extends ScoobiApp { /* Get graph data into correct form. */ - def initialise[K : Manifest : WireFormat](input: DList[(K, List[K])]) = + private def initialise[K : Manifest : WireFormat](input: DList[(K, List[K])]) = input map { case (url, links) => (url, (1f, 0f, links)) } /* Perform a single iteration of page-rank. */ - def update[K : Manifest : WireFormat : Grouping](prev: DList[(K, (Float, Float, List[K]))], d: Float) = { + private def update[K : Manifest : WireFormat : Grouping](prev: DList[(K, (Float, Float, List[K]))], d: Float) = { val outbound = prev flatMap { case (url, (pr, _, links)) => links.map((_, pr / links.size)) } @@ -46,32 +46,33 @@ object PageRank extends ScoobiApp { } } - def latestRankings(i: Int): DList[(Int, (Float, Float, List[Int]))] = fromAvroFile(output + i) + def run() { + val names = args(0) + val graph = args(1) + val output = args(2) + "/pr/" - /* Perform a single iteration of PageRank. */ - def iterateOnce(i : Int): Float = { - val curr = if (i == 0) initialise(loadGraph(graph)) else latestRankings(i) - val next = update(curr, 0.5f) - val maxDelta = next.map { case (_, (n, o, _)) => math.abs(n - o) } .max.materialize - persist(toAvroFile(next, output + (i + 1)), maxDelta.use) - val d = maxDelta.get.head - println("Current delta = " + d) - d - } + def latestRankings(i: Int): DList[(Int, (Float, Float, List[Int]))] = fromAvroFile(output + i) - /* Iterate until convergence. */ - val names = args(0) - val graph = args(1) - val output = args(2) + "/pr/" - var i = 0 - var delta = 10.0f - while (delta > 1.0f) { delta = iterateOnce(i); i += 1 } + /* Perform a single iteration of PageRank. */ + def iterateOnce(i : Int): Float = { + val curr = if (i == 0) initialise(loadGraph(graph)) else latestRankings(i) + val next = update(curr, 0.5f) + val maxDelta = next.map { case (_, (n, o, _)) => math.abs(n - o) } .max + val (_, md) = persist(toAvroFile(next, output + (i + 1)), maxDelta) + println("Current delta = " + md) + md + } + /* Iterate until convergence. */ + var i = 0 + var delta = 10.0f + while (delta > 1.0f) { delta = iterateOnce(i); i += 1 } - /* Write out final results to text file */ - val pageranks = latestRankings(i).map { case (id, (pr, _, _)) => (id, pr) } - val urls = fromDelimitedTextFile(names) { case AnInt(id) :: url :: _ => (id, url) } - persist(toDelimitedTextFile(join(urls, pageranks).values, output + "result")) -} + /* Write out final results to text file */ + val pageranks = latestRankings(i).map { case (id, (pr, _, _)) => (id, pr) } + val urls = fromDelimitedTextFile(names) { case AnInt(id) :: url :: _ => (id, url) } + persist(toDelimitedTextFile(join(urls, pageranks).values, output + "result")) + } +} diff --git a/examples/secondarySort/build.sbt b/examples/secondarySort/build.sbt deleted file mode 100644 index 9912edfd9..000000000 --- a/examples/secondarySort/build.sbt +++ /dev/null @@ -1,7 +0,0 @@ -name := "Scoobi Secondary Sort" - -version := "0.1" - -scalaVersion := "2.9.2" - -libraryDependencies += "com.nicta" %% "scoobi" % "0.4.0-SNAPSHOT" % "provided" diff --git a/examples/secondarySort/project/project/build.scala b/examples/secondarySort/project/project/build.scala deleted file mode 100644 index aaf35c294..000000000 --- a/examples/secondarySort/project/project/build.scala +++ /dev/null @@ -1,8 +0,0 @@ -import sbt._ - -object Plugins extends Build { - lazy val root = Project("root", file(".")) dependsOn( - uri("git://github.com/NICTA/sbt-scoobi.git#master") - ) -} - diff --git a/examples/secondarySort/src/main/scala/sort.scala b/examples/secondarySort/src/main/scala/sort.scala deleted file mode 100644 index cbc1a7385..000000000 --- a/examples/secondarySort/src/main/scala/sort.scala +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Copyright 2011 National ICT Australia Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.nicta.scoobi.examples - -import com.nicta.scoobi.Scoobi._ - -object SeconarySort extends ScoobiApp { - // Problem: Let's start some first and second names - - type FirstName = String - type LastName = String - - val names: DList[(FirstName, LastName)] = DList.apply( - ("Michael", "Jackson"), - ("Leonardo", "Da Vinci"), - ("John", "Kennedy"), - ("Mark", "Twain"), - ("Bat", "Man"), - ("Michael", "Jordan"), - ("Mark", "Edison"), - ("Michael", "Landon"), - ("Leonardo", "De Capro"), - ("Michael", "J. Fox") - ) - - - // let's say I want to group everyone with the same first name, I could simply do: - // names.groupByKey - - - // The problem is, however, there's no ordering associated with the Iterable[LastName] - // this means, if order is required (e.g. we're processing a time series) or outputting - // the last names in alphabetical order -- we'd have to use a parallelDo to load the entire - // reducers collection to memory, then sort it there. This is both slow, and going to likely - // use too much memory. - - // look at tmp-out/names to see the un-orderness of it all - - - - // So our solution is to do a "Secondary Sort", this is exposed with Scoobi's Grouping[K] - // In hadoop (and thus scoobi) a seconary sort can only happen on the Key, so what we need - // to do, is make our Key contain enough information to sort the last names for any given - // first name. - - val bigKey: DList[((FirstName, LastName), LastName)] = names.map(a => ((a._1, a._2), a._2)) - - // So if we started with ("Jonny", "Cash"), new key is (("Jonny", "Cash"), "Cash") - - // So we have duplicated enough into the key to do our sort, while making sure the value is - // still useful. (Your first instinct might be to make the DList of type DList[((FirstName, LastName), Unit)]) - // but this will *not* work. As while you'll get your stuff in-order, you won't know what the actual value is! - - // So here is where the magic happens: - // create a new grouping method for anything with the key (FirstName, LastName) // Warning: this is just a Tuple2[String, String] - implicit val grouping = new Grouping[(FirstName, LastName)] { - - override def partition(key: (FirstName, LastName), howManyReducers: Int): Int = { - // This function says what reducer this particular 'key' should go to. We must override the - // default impl, because it looks at the entire key, and makes sure all the same - // keys go to the same reducer. But we want to only 'look' at the 'FirstName' part - // so that everything with the same FirstName goes to the same reducer (even if it has a different LastName) - - // So we'll just use the default (string) partition, and only on the first name - implicitly[Grouping[FirstName]].partition(key._1, howManyReducers) - } - - - override def sortCompare(a: (FirstName, LastName), b: (FirstName, LastName)): Int = { - // Ok, here's where the fun is! Everything that is sent to the same reducer now needs - // an ordering. So this function is called. Here we return -1 if 'a' should be before 'b', - // and 0 if they're they same, and 1 if they're different. - - // So the first thing we want to do, is look at first names - - val firstNameOrdering = a._1.compareTo(b._1) - - firstNameOrdering match { - case 0 => { - // Interesting! Here the firstName's are the same. So what we want to do, is order by - // the lastNames - - a._2.compareTo(b._2) - } - case x => x // otherwise, just return the result for which of the FirstName's is first - } - } - - override def groupCompare(a: (FirstName, LastName), b: (FirstName, LastName)): Int = { - // So now everything going to the reducer has a proper ordering (thanks to our 'sortCompare' function) - // now hadoop allows us to "collapse" everything that is logically the same. So two keys are logically - // the same if the FirstName's are equal - a._1.compareTo(b._1) - } - - } - - val data: DList[((FirstName, LastName), Iterable[LastName])] = bigKey.groupByKey // scala's implicit magic picks up - // the 'Grouping[(FirstName, LastName)' - - // and our data dlist's Iterable[LastName] has ordering! Normally at this point, you'd go through - // it with a function that relies on it's order. - - // the key thing to keep in mind, is the LastName in the *Key* is almost totally useless - // because of the way the grouping has "collapsed" things. So even though the type of 'data' is - // a 'DList[((FirstName, LastName), Iterable[LastName])]' you should treat it as if its type was: - // DList[((FirstName, Unit), Iterable[LastName])] - - - // writing this to disk, so you can make sure it meets your expectations! - - persist(toTextFile(data, "tmp-out/secondary-sort", true)) // write to - -} diff --git a/examples/shortestPath/build.sbt b/examples/shortestPath/build.sbt deleted file mode 100644 index 2adc73575..000000000 --- a/examples/shortestPath/build.sbt +++ /dev/null @@ -1,9 +0,0 @@ -name := "Shortest Path" - -version := "0.1" - -scalaVersion := "2.9.2" - -libraryDependencies += "com.nicta" %% "scoobi" % "0.4.0-SNAPSHOT" % "provided" - -scalacOptions += "-deprecation" diff --git a/examples/shortestPath/project/project/build.scala b/examples/shortestPath/project/project/build.scala deleted file mode 100644 index aaf35c294..000000000 --- a/examples/shortestPath/project/project/build.scala +++ /dev/null @@ -1,8 +0,0 @@ -import sbt._ - -object Plugins extends Build { - lazy val root = Project("root", file(".")) dependsOn( - uri("git://github.com/NICTA/sbt-scoobi.git#master") - ) -} - diff --git a/examples/shortestPath/src/main/scala/Graph.scala b/examples/shortestPath/src/main/scala/Graph.scala deleted file mode 100644 index 3faf31f32..000000000 --- a/examples/shortestPath/src/main/scala/Graph.scala +++ /dev/null @@ -1,215 +0,0 @@ -/** - * Copyright 2011 National ICT Australia Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.nicta.scoobi.examples - -import com.nicta.scoobi.Scoobi._ -import java.io._ - -object ShortestPath extends ScoobiApp -{ - implicit val NodeOrderImp = NodeComparator - - //implicits for efficient serialization - - implicit val unprocessedFormat = mkCaseWireFormat(Unprocessed, Unprocessed.unapply _) //mkObjectWireFormat(Unprocessed) - implicit val frontierFormat = mkCaseWireFormat(Frontier, Frontier.unapply _) - implicit val doneFormat = mkCaseWireFormat(Done, Done.unapply _) - - implicit val progressFormat = mkAbstractWireFormat[Progress, Unprocessed, Frontier, Done] - - implicit val nodeFormat = mkCaseWireFormat(Node, Node.unapply _) - implicit val nodeInfoFormat = mkCaseWireFormat(NodeInfo, NodeInfo.unapply _) - - - // To do a parallel distributed shortest-path search we want to represent the - // graph with a list of: - // Node as the Key, NodeInfo as the value - - // A Node is pretty simple, we're just using a String - - case class Node(data: String) // and later we will define an implicit node - // NodeComparator, to provide ordering and comparison - - // NodeInfo contains a list of all the nodes that connect to this node, as well - // as the current state. - // The possible states are: - // Unprocessed. This means we haven't yet got to the node, so we don't know - // the shortest way. - // Frontier. This means that the next iteration of the breadth first search - // should work from here. At this point, we know the shortest path - // Done. We have fully discovered this node, and know the shortest path to it - - // This can easily be represented in Scala - - sealed abstract class Progress() - - case class Unprocessed() extends Progress - case class Frontier(best: Int) extends Progress - case class Done(best: Int) extends Progress - - case class NodeInfo (edges: Iterable[Node], state: Progress) - - if (!new File("output-dir").mkdir) { - sys.error("Could not make output-dir for results. Perhaps it already exists (and you should delete/rename the old one)") - } - - generateGraph("output-dir/graph.txt") - - // The generated graph is in the form of a list of edges e.g (A, B), (C, D) - - val edges: DList[(Node, Node)] = fromDelimitedTextFile("output-dir/graph.txt", " ") { - case a :: b :: _ => (Node(a), Node(b)) - } - - // This is quite easy to convert to the format we wanted. Given A connecst - // to B, we know B connects to A, so let's go through and add the reverse: - - val adjacencies: DList[(Node, Node)] = edges.flatMap { - case (first, second) => List((first, second), (second, first)) - } - - // And now we can group by the first node, and have a list of all its edges - val grouped: DList[(Node, Iterable[Node])] = adjacencies.groupByKey - - // Since we are calcuating the shortest path, we also need a starting point: - val startingPoint = Node("A") - - // And now finally, we want it the form "NodeInfo" not just a list of edges: - val formatted: DList[(Node, NodeInfo)] = grouped.map{ - case (node, edges) if (node == startingPoint) => - // it takes 0 steps to get to our starting point - (node, NodeInfo(edges, Frontier(0))) - case (node, edges) => - // We don't yet know how to get to this node - (node, NodeInfo(edges, Unprocessed())) - } - - // Our data is now in a form ready for doing a shortest path search. We will - // use a breadth first search, with 5 iterations - - val iterations = 5 - - // We pass our DList into a function which adds 'iterations' of a breadth first search - val breadthResult: DList[(Node, NodeInfo)] = breadthFirst(formatted, iterations) - - // And finally, instead of just having the raw information - // we can convert it to a nice pretty string - - def getV(p: Progress): Option[Int] = p match { - case Frontier(v) => Some(v) - case Done(v) => Some(v) - case Unprocessed() => None - } - - val prettyResult: DList[String] = breadthResult map { - case (n: Node, ni: NodeInfo) => getV(ni.state) match { - case None => "Couldn't get to " + n + " in " + iterations + " steps" - case Some(v) => "Shortest path from " + startingPoint + " to " + n + " is " + v + " steps" - } - } - - // and finally write to an output file - - DList.persist ( - toTextFile(prettyResult, "output-dir/result") - ) - - -// Here is where all the fun happens. -private def breadthFirst(dlist: DList[(Node, NodeInfo)], depth: Int): DList[(Node, NodeInfo)] = { - require (depth >= 1) - - // What we want to do, is look at the NodeInfo, and if the state is "Frontier" - // we know that we are up to this stage of the breadth first search. And we - // also know the shortest path to it. - - // If it took n steps to get to this node, we know we can get to all its - // connecting nodes in n+1 steps - - // However, all we really know is that these connecting nodes might be the - // next "Frontier" (but maybe not, perhaps we already know the best path to it) - // and we know a possible cost in getting there (n+1). - - // What we do, is spit out all the information we do know -- and use the - // combine function to pick the best. We also do not know this edge's edges - // so we'll leave it as an empty list -- and again, let the combine function - // clean up after us. - - val firstMap: DList[(Node, NodeInfo)] = dlist.flatMap { - case (n: Node, ni: NodeInfo) => ni.state match { - case Frontier(distance) => - // If it was a frontier, it will not be in the Done stage for the next iteration - List((n, NodeInfo(ni.edges, Done(distance)))) ++ - // along with all the information we know about its connecting nodes - ni.edges.map { edge => (edge, NodeInfo(List[Node](), Frontier(distance+1))) } - - // This isn't a frontier, so return it intact - case o => List((n, NodeInfo(ni.edges, o))) - } - } - - // The previous flatMap made a big mess! Now we're going to have to clean it up - // We group by the Node again, to get a list of all the spitted out NodeInfos - - val firstGrouped: DList[(Node, Iterable[NodeInfo])] = firstMap.groupByKey - - // And here's the clean up stage. Some of the NodeInfo's aren't going to have - // the edge-list, so we insert that back in. Along with always picking the best - // progress (the minimum steps, and the furthest along state) - - // A helper function, to tell us which of the progress is furthest - def furthest(p1: Progress, p2: Progress) = p1 match { - case Unprocessed() => p2 - case Frontier(v1) => p2 match { - case Unprocessed() => Frontier(v1) - case Frontier(v2) => Frontier(math.min(v1, v2)) - case Done(v2) => Done(v2) - } - case Done(v1) => Done(v1) - } - - val firstCombiner: DList[(Node, NodeInfo)] = firstGrouped.combine { - (n1: NodeInfo, n2: NodeInfo) => - NodeInfo ( - (if (n1.edges.isEmpty) - n2.edges - else - n1.edges), - furthest(n1.state, n2.state) - ) - } - - // And this iteration is done. - - if (depth > 1) - breadthFirst(firstCombiner, depth-1) - else - firstCombiner - } - - object NodeComparator extends Ordering[Node] { - def compare(a: Node, b: Node) = a.data compareTo b.data - } - - private def generateGraph(filename: String) { - // TODO: generate something more interesting.. - val fstream = new FileWriter(filename) - fstream write ( "A B\nA C\nC D\nC E\nD E\nF G\nE F\nG E" ) - fstream.close() - } - -} - diff --git a/examples/wordCount/build.sbt b/examples/wordCount/build.sbt index 8d9bb37da..82d8b4a4d 100644 --- a/examples/wordCount/build.sbt +++ b/examples/wordCount/build.sbt @@ -1,7 +1,11 @@ name := "Scoobi Word Count" -version := "0.1" +version := "1.0" scalaVersion := "2.9.2" +scalacOptions ++= Seq("-Ydependent-method-types", "-deprecation") + libraryDependencies += "com.nicta" %% "scoobi" % "0.4.0-SNAPSHOT" % "provided" + +resolvers += "snapshots" at "http://oss.sonatype.org/content/repositories/snapshots" diff --git a/examples/wordCount/src/main/scala/WordCount.scala b/examples/wordCount/src/main/scala/WordCount.scala index 271460bdf..8b29c8aec 100644 --- a/examples/wordCount/src/main/scala/WordCount.scala +++ b/examples/wordCount/src/main/scala/WordCount.scala @@ -19,41 +19,43 @@ import com.nicta.scoobi.Scoobi._ import java.io._ object WordCount extends ScoobiApp { - val (inputPath, outputPath) = - if (args.length == 0) { - if (!new File("output-dir").mkdir) { - sys.error("Could not make output-dir for results. Perhaps it already exists (and you should delete/rename the old one)") - } + def run() { + val (inputPath, outputPath) = + if (args.length == 0) { + if (!new File("output-dir").mkdir) { + sys.error("Could not make output-dir for results. Perhaps it already exists (and you should delete/rename the old one)") + } - val fileName = "output-dir/all-words.txt" + val fileName = "output-dir/all-words.txt" - // generate 5000 random words (with high collisions) and save at fileName - generateWords(fileName, 5000) + // generate 5000 random words (with high collisions) and save at fileName + generateWords(fileName, 5000) - (fileName, "output-dir") + (fileName, "output-dir") - } else if (args.length == 2) { - (args(0), args(1)) - } else { - sys.error("Expecting input and output path, or no arguments at all.") - } + } else if (args.length == 2) { + (args(0), args(1)) + } else { + sys.error("Expecting input and output path, or no arguments at all.") + } - // Firstly we load up all the (new-line-separated) words into a DList - val lines: DList[String] = fromTextFile(inputPath) + // Firstly we load up all the (new-line-separated) words into a DList + val lines: DList[String] = fromTextFile(inputPath) - // What we want to do, is record the frequency of words. So we'll convert it to a key-value - // pairs where the key is the word, and the value the frequency (which to start with is 1) - val keyValuePair: DList[(String, Int)] = lines flatMap { _.split(" ") } map { w => (w, 1) } + // What we want to do, is record the frequency of words. So we'll convert it to a key-value + // pairs where the key is the word, and the value the frequency (which to start with is 1) + val keyValuePair: DList[(String, Int)] = lines flatMap { _.split(" ") } map { w => (w, 1) } - // Now let's group all words that compare the same - val grouped: DList[(String, Iterable[Int])] = keyValuePair.groupByKey - // Now we have it in the form (Word, ['1', '1', '1', 1' etc.]) + // Now let's group all words that compare the same + val grouped: DList[(String, Iterable[Int])] = keyValuePair.groupByKey + // Now we have it in the form (Word, ['1', '1', '1', 1' etc.]) - // So what we want to do, is combine all the numbers into a single value (the frequency) - val combined: DList[(String, Int)] = grouped.combine((_+_)) + // So what we want to do, is combine all the numbers into a single value (the frequency) + val combined: DList[(String, Int)] = grouped.combine((_+_)) - // We can evaluate this, and write it to a text file - DList.persist(toTextFile(combined, outputPath + "/word-results")); + // We can evaluate this, and write it to a text file + persist(toTextFile(combined, outputPath + "/word-results")); + } /* Write 'count' random words to the file 'filename', with a high amount of collisions */ private def generateWords(filename: String, count: Int) {