Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'develop' of https://github.com/twitter/scalding into hl…

…l-aggregate
  • Loading branch information...
commit 30e4eb27c742ed77c34062402af2075693cf9df9 2 parents 350a166 + 78edf58
@asiegel34 authored
View
8 build.sbt
@@ -26,9 +26,13 @@ libraryDependencies += "cascading" % "cascading-local" % "2.0.2"
libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.2"
-libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.4.5"
+libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.4.6"
-libraryDependencies += "com.twitter" % "maple" % "0.2.4"
+libraryDependencies += "com.twitter" % "maple" % "0.2.5"
+
+libraryDependencies += "com.twitter" % "chill_2.9.2" % "0.1.0"
+
+libraryDependencies += "com.twitter" % "algebird_2.9.2" % "0.1.6"
libraryDependencies += "commons-lang" % "commons-lang" % "2.4"
View
5 project/Build.scala
@@ -2,6 +2,7 @@ import sbt._
object ScaldingBuild extends Build {
lazy val root = Project("root", file("."))
- .dependsOn(RootProject(uri("git://github.com/twitter/algebird.git#master")))
- .dependsOn(RootProject(uri("git://github.com/twitter/chill.git#master")))
+// This caused us some pain:
+// .dependsOn(RootProject(uri("git://github.com/twitter/algebird.git#master")))
+// .dependsOn(RootProject(uri("git://github.com/twitter/chill.git#master")))
}
View
26 src/main/scala/com/twitter/scalding/GroupBuilder.scala
@@ -323,6 +323,32 @@ class GroupBuilder(val groupFields : Fields) extends
* groupAll and groupRandomly.
*/
def pass : GroupBuilder = takeWhile(0) { (t: TupleEntry) => true }
+
+ /**
+ * begining of block with access to expensive nonserializable state. The state object should
+ * contain a function release() for resource management purpose.
+ */
+ def using[C <: { def release() }](bf: => C) = new {
+
+ /**
+ * mapStream with state.
+ */
+ def mapStream[T,X](fieldDef : (Fields,Fields))(mapfn : (C, Iterator[T]) => TraversableOnce[X])
+ (implicit conv : TupleConverter[T], setter : TupleSetter[X]) = {
+ val (inFields, outFields) = fieldDef
+ //Check arity
+ conv.assertArityMatches(inFields)
+ setter.assertArityMatches(outFields)
+
+ val b = new SideEffectBufferOp[Unit,T,C,X](
+ (), bf,
+ (u : Unit, c : C, it: Iterator[T]) => mapfn(c, it),
+ new Function1[C, Unit] with java.io.Serializable { def apply(c: C) { c.release() }},
+ outFields, conv, setter)
+ every(pipe => new Every(pipe, inFields, b, defaultMode(inFields, outFields)))
+ }
+ }
+
}
/**
View
5 src/main/scala/com/twitter/scalding/JoinAlgorithms.scala
@@ -28,6 +28,7 @@ import cascading.tuple._
import cascading.cascade._
import scala.util.Random
+import scala.collection.JavaConverters._
/*
* Keeps all the logic related to RichPipe joins.
@@ -348,7 +349,7 @@ trait JoinAlgorithms {
assert(sampleRate > 0 && sampleRate < 1, "Sampling rate for skew joins must lie strictly between 0 and 1")
// This assertion could be avoided, but since this function calls outer joins and left joins,
// we assume it to avoid renaming pain.
- assert(fs._1.iterator.toList.intersect(fs._2.iterator.toList).isEmpty, "Join keys in a skew join must be disjoint")
+ assert(fs._1.iterator.asScala.toList.intersect(fs._2.iterator.asScala.toList).isEmpty, "Join keys in a skew join must be disjoint")
// 1. First, get an approximate count of the left join keys and the right join keys, so that we
// know how much to replicate.
@@ -401,7 +402,7 @@ trait JoinAlgorithms {
numReducers : Int = -1, isPipeOnRight : Boolean = false) = {
// Rename the fields to prepare for the leftJoin below.
- val renamedFields = joinFields.iterator.toList.map { field => "__RENAMED_" + field + "__" }
+ val renamedFields = joinFields.iterator.asScala.toList.map { field => "__RENAMED_" + field + "__" }
val renamedSampledCounts = sampledCounts.rename(joinFields -> renamedFields)
.project(Fields.join(renamedFields, countFields))
View
60 src/main/scala/com/twitter/scalding/Operations.scala
@@ -73,6 +73,23 @@ import CascadingUtils.kryoFor
}
/*
+ * BaseOperation with support for context
+ */
+ abstract class SideEffectBaseOperation[C] (
+ bf: => C, // begin function returns a context
+ ef: C => Unit, // end function to clean up context object
+ fields: Fields
+ ) extends BaseOperation[C](fields) {
+ override def prepare(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
+ operationCall.setContext(bf)
+ }
+
+ override def cleanup(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
+ ef(operationCall.getContext)
+ }
+ }
+
+ /*
* A map function that allows state object to be set up and tear down.
*/
class SideEffectMapFunction[S, C, T] (
@@ -82,11 +99,7 @@ import CascadingUtils.kryoFor
fields: Fields,
conv: TupleConverter[S],
set: TupleSetter[T]
- ) extends BaseOperation[C](fields) with Function[C] {
-
- override def prepare(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
- operationCall.setContext(bf)
- }
+ ) extends SideEffectBaseOperation[C](bf, ef, fields) with Function[C] {
override def operate(flowProcess: FlowProcess[_], functionCall: FunctionCall[C]) {
val context = functionCall.getContext
@@ -94,10 +107,6 @@ import CascadingUtils.kryoFor
val res = fn(context, s)
functionCall.getOutputCollector.add(set(res))
}
-
- override def cleanup(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
- ef(operationCall.getContext)
- }
}
/*
@@ -110,21 +119,13 @@ import CascadingUtils.kryoFor
fields: Fields,
conv: TupleConverter[S],
set: TupleSetter[T]
- ) extends BaseOperation[C](fields) with Function[C] {
-
- override def prepare(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
- operationCall.setContext(bf)
- }
+ ) extends SideEffectBaseOperation[C](bf, ef, fields) with Function[C] {
override def operate(flowProcess: FlowProcess[_], functionCall: FunctionCall[C]) {
val context = functionCall.getContext
val s = conv(functionCall.getArguments)
fn(context, s) foreach { t => functionCall.getOutputCollector.add(set(t)) }
}
-
- override def cleanup(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
- ef(operationCall.getContext)
- }
}
class FilterFunction[T](fn : T => Boolean, conv : TupleConverter[T]) extends BaseOperation[Any] with Filter[Any] {
@@ -299,4 +300,27 @@ import CascadingUtils.kryoFor
iterfn(deepCopyInit, in).foreach { x => oc.add(set(x)) }
}
}
+
+ /*
+ * A buffer that allows state object to be set up and tear down.
+ */
+ class SideEffectBufferOp[I,T,C,X](
+ init : I,
+ bf: => C, // begin function returns a context
+ iterfn: (I, C, Iterator[T]) => TraversableOnce[X],
+ ef: C => Unit, // end function to clean up context object
+ fields: Fields,
+ conv: TupleConverter[T],
+ set: TupleSetter[X]
+ ) extends SideEffectBaseOperation[C](bf, ef, fields) with Buffer[C] {
+
+ def operate(flowProcess : FlowProcess[_], call : BufferCall[C]) {
+ val deepCopyInit = kryoFor(flowProcess).copy(init)
+ val context = call.getContext
+ val oc = call.getOutputCollector
+ val in = call.getArgumentsIterator.asScala.map { entry => conv(entry) }
+ iterfn(deepCopyInit, context, in).foreach { x => oc.add(set(x)) }
+ }
+ }
+
}
View
35 src/main/scala/com/twitter/scalding/RichPipe.scala
@@ -71,12 +71,12 @@ class RichPipe(val pipe : Pipe) extends java.io.Serializable with JoinAlgorithms
* begining of block with access to expensive nonserializable state. The state object should
* contain a function release() for resource management purpose.
*/
- def using[A, C <: { def release() }](bf: => C) = new {
+ def using[C <: { def release() }](bf: => C) = new {
/**
* For pure side effect.
*/
- def foreach(f: Fields)(fn: (C, A) => Unit)
+ def foreach[A](f: Fields)(fn: (C, A) => Unit)
(implicit conv: TupleConverter[A], set: TupleSetter[Unit], flowDef: FlowDef, mode: Mode) = {
conv.assertArityMatches(f)
val newPipe = new Each(pipe, f, new SideEffectMapFunction(bf, fn,
@@ -210,7 +210,7 @@ class RichPipe(val pipe : Pipe) extends java.io.Serializable with JoinAlgorithms
* insert('a, 1)
* }}}
*/
- def insert[A](fs : Fields, value : A)(implicit conv : TupleSetter[A]) : Pipe =
+ def insert[A](fs : Fields, value : A)(implicit conv : TupleSetter[A]) : Pipe =
map(() -> fs) { _:Unit => value }
@@ -244,6 +244,35 @@ class RichPipe(val pipe : Pipe) extends java.io.Serializable with JoinAlgorithms
}
/**
+ * Given a function, partitions the pipe into several groups based on the
+ * output of the function. Then applies a GroupBuilder function on each of the
+ * groups.
+ *
+ * Example:
+ pipe
+ .mapTo(()->('age, 'weight) { ... }
+ .partition('age -> 'isAdult) { _ > 18 } { _.average('weight) }
+ * pipe now contains the average weights of adults and minors.
+ */
+ def partition[A,R](fs: (Fields, Fields))(fn: (A) => R)(
+ builder: GroupBuilder => GroupBuilder)(
+ implicit conv: TupleConverter[A],
+ ord: Ordering[R],
+ rset: TupleSetter[R]): Pipe = {
+ val (fromFields, toFields) = fs
+ conv.assertArityMatches(fromFields)
+ rset.assertArityMatches(toFields)
+
+ val tmpFields = new Fields("__temp__")
+ tmpFields.setComparator("__temp__", ord)
+
+ map(fromFields -> tmpFields)(fn)(conv, SingleSetter)
+ .groupBy(tmpFields)(builder)
+ .map[R,R](tmpFields -> toFields){ (r:R) => r }(singleConverter[R], rset)
+ .discard(tmpFields)
+ }
+
+ /**
* If you use a map function that does not accept TupleEntry args,
* which is the common case, an implicit conversion in GeneratedConversions
* will convert your function into a `(TupleEntry => T)`. The result type
View
8 src/main/scala/com/twitter/scalding/Source.scala
@@ -89,6 +89,8 @@ abstract class Source extends java.io.Serializable {
}
def read(implicit flowDef : FlowDef, mode : Mode) = {
+ checkFlowDefNotNull
+
//insane workaround for scala compiler bug
val sources = flowDef.getSources().asInstanceOf[JMap[String,Any]]
val srcName = this.toString
@@ -103,6 +105,8 @@ abstract class Source extends java.io.Serializable {
* the next operation
*/
def writeFrom(pipe : Pipe)(implicit flowDef : FlowDef, mode : Mode) = {
+ checkFlowDefNotNull
+
//insane workaround for scala compiler bug
val sinks = flowDef.getSinks().asInstanceOf[JMap[String,Any]]
val sinkName = this.toString
@@ -113,6 +117,10 @@ abstract class Source extends java.io.Serializable {
pipe
}
+ protected def checkFlowDefNotNull(implicit flowDef : FlowDef, mode : Mode) {
+ assert(flowDef != null, "Trying to access null FlowDef while in mode: %s".format(mode))
+ }
+
protected def transformForWrite(pipe : Pipe) = pipe
protected def transformForRead(pipe : Pipe) = pipe
View
4 src/main/scala/com/twitter/scalding/TupleConversions.scala
@@ -61,7 +61,7 @@ trait TupleConversions extends GeneratedConversions {
}
- implicit def iterableToIterable [A] (iterable : java.lang.Iterable[A]) : Iterable[A] = {
+ def iterableToIterable [A] (iterable : java.lang.Iterable[A]) : Iterable[A] = {
if(iterable == null) {
None
} else {
@@ -69,7 +69,7 @@ trait TupleConversions extends GeneratedConversions {
}
}
- implicit def iteratorToIterator [A] (iterator : java.util.Iterator[A]) : Iterator[A] = {
+ def iteratorToIterator [A] (iterator : java.util.Iterator[A]) : Iterator[A] = {
if(iterator == null) {
List().iterator
} else {
View
135 src/main/scala/com/twitter/scalding/examples/WeightedPageRankFromMatrix.scala
@@ -0,0 +1,135 @@
+package com.twitter.scalding.examples
+
+import scala.collection._
+
+import com.twitter.scalding._
+import com.twitter.scalding.mathematics.{Matrix, ColVector}
+import com.twitter.scalding.mathematics.Matrix._
+
+/**
+ * A weighted PageRank implementation using the Scalding Matrix API. This
+ * assumes that all rows and columns are of type {@link Int} and values or egde
+ * weights are {@link Double}. If you want an unweighted PageRank, simply set
+ * the weights on the edges to 1.
+ *
+ * Input arguments:
+ *
+ * d -- damping factor
+ * n -- number of nodes in the graph
+ * currentIteration -- start with 0 probably
+ * maxIterations -- stop after n iterations
+ * convergenceThreshold -- using the sum of the absolute difference between
+ * iteration solutions, iterating stops once we reach
+ * this threshold
+ * rootDir -- the root directory holding all starting, intermediate and final
+ * data/output
+ *
+ * The expected structure of the rootDir is:
+ *
+ * rootDir
+ * |- iterations
+ * | |- 0 <-- a TSV of (row, value) of size n, value can be 1/n (generate this)
+ * | |- n <-- holds future iterations/solutions
+ * |- edges <-- a TSV of (row, column, value) for edges in the graph
+ * |- onesVector <-- a TSV of (row, 1) of size n (generate this)
+ * |- diff <-- a single line representing the difference between the last iterations
+ * |- constants <-- built at iteration 0, these are constant for any given matrix/graph
+ * |- M_hat
+ * |- priorVector
+ *
+ * Don't forget to set the number of reducers for this job:
+ * -D mapred.reduce.tasks=n
+ */
+class WeightedPageRankFromMatrix(args: Args) extends Job(args) {
+
+ val d = args("d").toDouble // aka damping factor
+ val n = args("n").toInt // number of nodes in the graph
+
+ val currentIteration = args("currentIteration").toInt
+ val maxIterations = args("maxIterations").toInt
+ val convergenceThreshold = args("convergenceThreshold").toDouble
+
+ val rootDir = args("rootDir")
+ val edgesLoc = rootDir + "/edges"
+ val onesVectorLoc = rootDir + "/onesVector"
+
+ val iterationsDir = rootDir + "/iterations"
+ val previousVectorLoc = iterationsDir + "/" + currentIteration
+ val nextVectorLoc = iterationsDir + "/" + (currentIteration + 1)
+
+ val diffLoc = rootDir + "/diff"
+
+ // load the previous iteration
+ val previousVector = colVectorFromTsv(previousVectorLoc)
+
+ // iterate, write results
+ // R(t + 1) = d * M * R(t) + ((1 - d) / n) * _1_
+ val nextVector = M_hat * previousVector + priorVector
+ nextVector.write(Tsv(nextVectorLoc))
+
+ measureConvergenceAndStore()
+
+ /**
+ * Recurse and iterate again iff we are under the max number of iterations and
+ * vector has not converged.
+ */
+ override def next = {
+ val diff = Tsv(diffLoc).readAtSubmitter[Double].head
+
+ if (currentIteration + 1 < maxIterations && diff > convergenceThreshold) {
+ val newArgs = args + ("currentIteration", Some((currentIteration + 1).toString))
+ Some(clone(newArgs))
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Measure convergence by calculating the total of the absolute difference
+ * between the previous and next vectors. This stores the result after
+ * calculation.
+ */
+ def measureConvergenceAndStore() {
+ (previousVector - nextVector).
+ mapWithIndex { case (value, index) => math.abs(value) }.
+ sum.
+ write(Tsv(diffLoc))
+ }
+
+ /**
+ * Load or generate on first iteration the matrix M^ given A.
+ */
+ def M_hat: Matrix[Int, Int, Double] = {
+
+ if (currentIteration == 0) {
+ val A = matrixFromTsv(edgesLoc)
+ val M = A.rowL1Normalize.transpose
+ val M_hat = d * M
+
+ M_hat.write(Tsv(rootDir + "/constants/M_hat"))
+ } else {
+ matrixFromTsv(rootDir + "/constants/M_hat")
+ }
+ }
+
+ /**
+ * Load or generate on first iteration the prior vector given d and n.
+ */
+ def priorVector: ColVector[Int, Double] = {
+
+ if (currentIteration == 0) {
+ val onesVector = colVectorFromTsv(onesVectorLoc)
+ val priorVector = ((1 - d) / n) * onesVector.toMatrix(0)
+
+ priorVector.getCol(0).write(Tsv(rootDir + "/constants/priorVector"))
+ } else {
+ colVectorFromTsv(rootDir + "/constants/priorVector")
+ }
+ }
+
+ def matrixFromTsv(input: String): Matrix[Int, Int, Double] =
+ TypedTsv[(Int, Int, Double)](input).toMatrix
+
+ def colVectorFromTsv(input: String): ColVector[Int, Double] =
+ TypedTsv[(Int, Double)](input).toCol
+}
View
30 src/test/scala/com/twitter/scalding/CoreTest.scala
@@ -115,6 +115,36 @@ class MapToGroupBySizeSumMaxTest extends Specification with TupleConversions {
}
}
+class PartitionJob(args: Args) extends Job(args) {
+ Tsv("input", new Fields("age", "weight"))
+ .partition('age -> 'isAdult) { (_:Int) > 18 } { _.average('weight) }
+ .project('isAdult, 'weight)
+ .write(Tsv("output"))
+}
+
+class PartitionJobTest extends Specification with TupleConversions {
+ noDetailedDiffs()
+ "A PartitionJob" should {
+ val input = List((3, 23),(23,154),(15,123),(53,143),(7,85),(19,195),
+ (42,187),(35,165),(68,121),(13,103),(17,173),(2,13))
+
+ val (adults, minors) = input.partition { case (age, _) => age > 18 }
+ val Seq(adultWeights, minorWeights) = Seq(adults, minors).map { list =>
+ list.map { case (_, weight) => weight }
+ }
+ val expectedOutput = Map(
+ true -> adultWeights.sum / adultWeights.size.toDouble,
+ false -> minorWeights.sum / minorWeights.size.toDouble
+ )
+ JobTest(new com.twitter.scalding.PartitionJob(_))
+ .source(Tsv("input", new Fields("age", "weight")), input)
+ .sink[(Boolean,Double)](Tsv("output")) { outBuf =>
+ outBuf.toMap must be_==(expectedOutput)
+ }
+ .run.finish
+ }
+}
+
class MRMJob(args : Args) extends Job(args) {
val in = Tsv("input").read.mapTo((0,1) -> ('x,'y)) { xy : (Int,Int) => xy }
// XOR reduction (insane, I guess:
View
2  src/test/scala/com/twitter/scalding/KryoTest.scala
@@ -10,7 +10,7 @@ import java.io.{ByteArrayInputStream=>BIS}
import scala.collection.immutable.ListMap
import scala.collection.immutable.HashMap
-import com.twitter.algebird.{AveragedValue, DecayedValue, HLLInstance,
+import com.twitter.algebird.{AveragedValue, DecayedValue,
HyperLogLog, HyperLogLogMonoid, Moments, Monoid}
/*
View
52 src/test/scala/com/twitter/scalding/SideEffectTest.scala
@@ -51,3 +51,55 @@ class SideEffectTest extends Specification with TupleConversions with FieldConve
.finish
}
}
+
+/*
+ * ZipBuffer uses (unneccessary) side effect to construct zipped.
+ */
+class ZipBuffer(args : Args) extends Job(args) {
+
+ //import RichPipe._
+ def createState = new {
+ var lastLine: String = null
+ def release() {}
+ }
+
+ val zipped = Tsv("line",('line)).pipe
+ .map('line -> 'oddOrEven) { line : String => line.substring(line.length-1).toInt % 2 match {
+ case 0 => "even"
+ case 1 => "odd"
+ }}
+ .groupBy('oddOrEven) {
+ _.using { createState }
+ .mapStream('line -> ('l1, 'l2)) { (accu, iter : Iterator[String]) => {
+ accu.lastLine = iter.next()
+ for (line <- iter) yield {
+ val result = (accu.lastLine, line)
+ accu.lastLine = line
+ result
+ }
+ }}
+ }
+ .project('l1, 'l2)
+
+ zipped.write(Tsv("zipped"))
+}
+
+class SideEffectBufferTest extends Specification with TupleConversions with FieldConversions {
+ "ZipBuffer should do create two zipped sequences, one for even lines and one for odd lines. Coded with side effect" should {
+ JobTest("com.twitter.scalding.ZipBuffer")
+ .source(Tsv("line",('line)), List(Tuple1("line1"), Tuple1("line2"), Tuple1("line3"), Tuple1("line4"), Tuple1("line5"), Tuple1("line6")))
+ .sink[(String, String)](Tsv("zipped")) { ob =>
+ "correctly compute zipped sequence" in {
+ val res = ob.toList.sorted
+ val expected = List(("line1", "line3"), ("line3", "line5"), ("line2", "line4"), ("line4", "line6")).sorted
+ res.zip(expected) foreach {
+ case ((a, b), (c, d)) =>
+ a must be_== ( c )
+ b must be_== ( d )
+ }
+ }
+ }
+ .run
+ .finish
+ }
+}
View
150 src/test/scala/com/twitter/scalding/examples/WeightedPageRankFromMatrixTest.scala
@@ -0,0 +1,150 @@
+package com.twitter.scalding.examples
+
+import scala.collection._
+
+import org.specs._
+
+import com.twitter.scalding._
+import com.twitter.scalding.Dsl._
+
+import WeightedPageRankFromMatrixSpec._
+
+class WeightedPageRankFromMatrixSpec extends Specification with TupleConversions {
+
+ "Weighted PageRank from Matrix job" should {
+
+ // 0.0 0.0 0.0 0.0 1.0
+ // 0.5 0.0 0.0 0.0 0.0
+ // 0.5 0.0 0.0 0.0 0.0
+ // 0.0 1.0 0.5 0.0 0.0
+ // 0.0 0.0 0.5 1.0 0.0
+ val edges = List(
+ (0, 4, 1.0),
+ (1, 0, 0.5),
+ (2, 0, 0.5),
+ (3, 1, 1.0), (3, 2, 0.5),
+ (4, 2, 0.5), (4, 3, 1.0))
+
+ val d = 0.4d // damping factor
+ val n = 5 // number of nodes
+ val onesVector = filledColumnVector(1d, n)
+ val iterationZeroVector = filledColumnVector(1d / n, n)
+
+ val expectedSolution = Array(0.28, 0.173333, 0.173333, 0.173333, 0.2)
+
+ JobTest("com.twitter.scalding.examples.WeightedPageRankFromMatrix").
+ arg("d", d.toString).
+ arg("n", n.toString).
+ arg("convergenceThreshold", "0.0001").
+ arg("maxIterations", "1").
+ arg("currentIteration", "0").
+ arg("rootDir", "root").
+ source(TypedTsv[(Int, Int, Double)]("root/edges"), edges).
+ source(TypedTsv[(Int, Double)]("root/onesVector"), onesVector).
+ source(TypedTsv[(Int, Double)]("root/iterations/0"), iterationZeroVector).
+ sink[(Int, Int, Double)](Tsv("root/constants/M_hat")) { outputBuffer =>
+ outputBuffer.size must be (7)
+ val outputMap = toSparseMap(outputBuffer)
+ outputMap((0 -> 1)) must beCloseTo (0.4, 0)
+ outputMap((0 -> 2)) must beCloseTo (0.4, 0)
+ outputMap((1 -> 3)) must beCloseTo (0.26666, 0.00001)
+ outputMap((2 -> 3)) must beCloseTo (0.13333, 0.00001)
+ outputMap((2 -> 4)) must beCloseTo (0.13333, 0.00001)
+ outputMap((3 -> 4)) must beCloseTo (0.26666, 0.00001)
+ outputMap((4 -> 0)) must beCloseTo (0.4, 0)
+ }.
+ sink[(Int, Double)](Tsv("root/constants/priorVector")) { outputBuffer =>
+ outputBuffer.size must be (5)
+ val expectedValue = ((1 - d) / 2) * d
+ assertVectorsEqual(
+ new Array[Double](5).map { v => expectedValue },
+ outputBuffer.map(_._2).toArray)
+ }.
+ sink[(Int, Double)](Tsv("root/iterations/1")) { outputBuffer =>
+ outputBuffer.size must be (5)
+ assertVectorsEqual(
+ expectedSolution,
+ outputBuffer.map(_._2).toArray,
+ 0.00001)
+ }.
+ sink[Double](Tsv("root/diff")) { outputBuffer =>
+ outputBuffer.size must be (1)
+
+ val expectedDiff =
+ expectedSolution.zip(iterationZeroVector.map(_._2)).
+ map { case (a, b) => math.abs(a - b) }.
+ sum
+ outputBuffer.head must beCloseTo (expectedDiff, 0.00001)
+ }.
+ run.
+ finish
+ }
+
+ private def assertVectorsEqual(expected: Array[Double], actual: Array[Double], variance: Double) {
+ actual.zipWithIndex.foreach { case (value, i) =>
+ value must beCloseTo (expected(i), variance)
+ }
+ }
+
+ private def assertVectorsEqual(expected: Array[Double], actual: Array[Double]) {
+ actual.zipWithIndex.foreach { case (value, i) =>
+ value must beCloseTo (expected(i), 0)
+ }
+ }
+}
+
+object WeightedPageRankFromMatrixSpec {
+
+ def toSparseMap[Row, Col, V](iterable: Iterable[(Row, Col, V)]): Map[(Row, Col), V] =
+ iterable.map { entry => ((entry._1, entry._2), entry._3) }.toMap
+
+ def filledColumnVector(value: Double, size: Int): List[(Int, Double)] = {
+ val vector = mutable.ListBuffer[(Int, Double)]()
+ (0 until size).foreach { row =>
+ vector += new Tuple2(row, value)
+ }
+
+ vector.toList
+ }
+}
+
+/**
+ * Octave/Matlab implementations to provide the expected ranks. This comes from
+ * the Wikipedia page on PageRank:
+ * http://en.wikipedia.org/wiki/PageRank#Computation
+
+function [v] = iterate(A, sv, d)
+
+N = size(A, 2)
+M = (spdiags(1 ./ sum(A, 2), 0, N, N) * A)';
+v = (d * M * sv) + (((1 - d) / N) .* ones(N, 1));
+
+endfunction
+
+iterate([0 0 0 0 1; 0.5 0 0 0 0; 0.5 0 0 0 0; 0 1 0.5 0 0; 0 0 0.5 1 0], [0.2; 0.2; 0.2; 0.2; 0.2], 0.4)
+
+% Parameter M adjacency matrix where M_i,j represents the link from 'j' to 'i', such that for all 'j' sum(i, M_i,j) = 1
+% Parameter d damping factor
+% Parameter v_quadratic_error quadratic error for v
+% Return v, a vector of ranks such that v_i is the i-th rank from [0, 1]
+
+function [v] = rank(M, d, v_quadratic_error)
+
+N = size(M, 2); % N is equal to half the size of M
+v = rand(N, 1);
+v = v ./ norm(v, 2);
+last_v = ones(N, 1) * inf;
+M_hat = (d .* M) + (((1 - d) / N) .* ones(N, N));
+
+while(norm(v - last_v, 2) > v_quadratic_error)
+ last_v = v;
+ v = M_hat * v;
+ v = v ./ norm(v, 2);
+end
+
+endfunction
+
+M = [0 0 0 0 1 ; 0.5 0 0 0 0 ; 0.5 0 0 0 0 ; 0 1 0.5 0 0 ; 0 0 0.5 1 0];
+rank(M, 0.4, 0.001)
+
+*/
Please sign in to comment.
Something went wrong with that request. Please try again.