Permalink
Browse files

Merge branch 'develop' of https://github.com/twitter/scalding into hl…

…l-aggregate
  • Loading branch information...
aaron-siegel committed Dec 18, 2012
2 parents ede346d + 495efd3 commit 37d8e46607f39ecd0df01f35846e4f743591aa4a
@@ -201,6 +201,19 @@ class RichPipe(val pipe : Pipe) extends java.io.Serializable with JoinAlgorithms
.discard('__shard__)
}
+
+ /**
+ * Adds a field with a constant value.
+ *
+ * == Usage ==
+ * {{{
+ * insert('a, 1)
+ * }}}
+ */
+ def insert[A](fs : Fields, value : A)(implicit conv : TupleSetter[A]) : Pipe =
+ map(() -> fs) { _:Unit => value }
+
+
/**
* Rename some set of N fields as another set of N fields
*
@@ -651,6 +651,12 @@ class RowVector[ColT,ValT] (val colS:Symbol, val valS:Symbol, inPipe: Pipe, val
def *[That,Res](that : That)(implicit prod : MatrixProduct[RowVector[ColT,ValT],That,Res]) : Res
= { prod(this, that) }
+ def +(that : RowVector[ColT,ValT])(implicit mon : Monoid[ValT]) = (this.transpose + that.transpose).transpose
+
+ def -(that : RowVector[ColT,ValT])(implicit group : Group[ValT]) = (this.transpose - that.transpose).transpose
+
+ def hProd(that: RowVector[ColT,ValT])(implicit ring: Ring[ValT]) : RowVector[ColT,ValT] = (this.transpose hProd that.transpose).transpose
+
def transpose : ColVector[ColT,ValT] = {
new ColVector[ColT,ValT](colS, valS, inPipe, sizeH.transpose)
}
@@ -741,6 +747,12 @@ class ColVector[RowT,ValT] (val rowS:Symbol, val valS:Symbol, inPipe : Pipe, val
def *[That,Res](that : That)(implicit prod : MatrixProduct[ColVector[RowT,ValT],That,Res]) : Res
= { prod(this, that) }
+ def +(that : ColVector[RowT,ValT])(implicit mon : Monoid[ValT]) = (this.toMatrix(true) + that.toMatrix(true)).getCol(true)
+
+ def -(that : ColVector[RowT,ValT])(implicit group : Group[ValT]) = (this.toMatrix(true) - that.toMatrix(true)).getCol(true)
+
+ def hProd(that: ColVector[RowT,ValT])(implicit ring: Ring[ValT]) : ColVector[RowT,ValT] = (this.toMatrix(true) hProd that.toMatrix(true)).getCol(true)
+
def transpose : RowVector[RowT,ValT] = {
new RowVector[RowT,ValT](rowS, valS, inPipe, sizeH.transpose)
}
@@ -59,6 +59,22 @@ case object SmallToBig extends MatrixJoiner {
}
}
+abstract class MatrixCrosser extends java.io.Serializable {
+ def apply(left: Pipe, right: Pipe) : Pipe
+}
+
+case object AnyCrossTiny extends MatrixCrosser {
+ override def apply(left: Pipe, right: Pipe) : Pipe = {
+ RichPipe(left).crossWithTiny(right)
+ }
+}
+
+case object AnyCrossSmall extends MatrixCrosser {
+ override def apply(left: Pipe, right: Pipe) : Pipe = {
+ RichPipe(left).crossWithSmaller(right)
+ }
+}
+
trait MatrixProduct[Left,Right,Result] extends java.io.Serializable {
def apply(left : Left, right : Right) : Result
}
@@ -87,6 +103,11 @@ object MatrixProduct extends java.io.Serializable {
}
}
+ def getCrosser(rightSize: SizeHint) : MatrixCrosser =
+ rightSize.total.map { t => if (t < maxTinyJoin) AnyCrossTiny else AnyCrossSmall }
+ .getOrElse(AnyCrossSmall)
+
+
implicit def literalScalarRightProduct[Row,Col,ValT](implicit ring : Ring[ValT]) :
MatrixProduct[Matrix[Row,Col,ValT],LiteralScalar[ValT],Matrix[Row,Col,ValT]] =
new MatrixProduct[Matrix[Row,Col,ValT],LiteralScalar[ValT],Matrix[Row,Col,ValT]] {
@@ -143,7 +164,7 @@ object MatrixProduct extends java.io.Serializable {
}
}
- implicit def rowColProduct[IdxT,ValT](implicit ring : Ring[ValT]) :
+ implicit def vectorInnerProduct[IdxT,ValT](implicit ring : Ring[ValT]) :
MatrixProduct[RowVector[IdxT,ValT],ColVector[IdxT,ValT],Scalar[ValT]] =
new MatrixProduct[RowVector[IdxT,ValT],ColVector[IdxT,ValT],Scalar[ValT]] {
def apply(left : RowVector[IdxT,ValT], right : ColVector[IdxT,ValT]) : Scalar[ValT] = {
@@ -153,6 +174,29 @@ object MatrixProduct extends java.io.Serializable {
}
}
+ implicit def vectorOuterProduct[RowT, ColT, ValT](implicit ring: Ring[ValT]) :
+ MatrixProduct[ColVector[RowT, ValT], RowVector[ColT, ValT], Matrix[RowT, ColT, ValT]] =
+ new MatrixProduct[ColVector[RowT, ValT], RowVector[ColT, ValT], Matrix[RowT, ColT, ValT]] {
+ def apply(left: ColVector[RowT, ValT], right: RowVector[ColT, ValT]) : Matrix[RowT, ColT, ValT] = {
+ val (newRightFields, newRightPipe) = ensureUniqueFields(
+ (left.rowS,left.valS),
+ (right.colS, right.valS),
+ right.pipe
+ )
+ val newColSym = Symbol(right.colS.name + "_newCol")
+ val newHint = left.sizeH * right.sizeH
+ val productPipe = Matrix.filterOutZeros(left.valS, ring) {
+ getCrosser(right.sizeH)
+ .apply(left.pipe, newRightPipe)
+ .map(left.valS.append(getField(newRightFields,1)) -> left.valS) { pair: (ValT, ValT) =>
+ ring.times(pair._1, pair._2)
+ }
+ }
+ .rename(getField(newRightFields,0)->newColSym)
+ new Matrix[RowT,ColT,ValT](left.rowS, newColSym, left.valS, productPipe, newHint)
+ }
+ }
+
implicit def standardMatrixProduct[RowL,Common,ColR,ValT](implicit ring : Ring[ValT]) :
MatrixProduct[Matrix[RowL,Common,ValT],Matrix[Common,ColR,ValT],Matrix[RowL,ColR,ValT]] =
new MatrixProduct[Matrix[RowL,Common,ValT],Matrix[Common,ColR,ValT],Matrix[RowL,ColR,ValT]] {
@@ -267,4 +311,24 @@ object MatrixProduct extends java.io.Serializable {
((left.diag) * right).toRow
}
}
+
+
+ implicit def rowMatrixProduct[Common, ColR, ValT](implicit ring: Ring[ValT]) :
+ MatrixProduct[RowVector[Common, ValT], Matrix[Common, ColR, ValT], RowVector[ColR, ValT]] =
+ new MatrixProduct[RowVector[Common, ValT], Matrix[Common, ColR, ValT], RowVector[ColR, ValT]] {
+ def apply(left: RowVector[Common, ValT], right: Matrix[Common, ColR, ValT]) = {
+ (left.toMatrix(true) * right).getRow(true)
+ }
+ }
+
+ implicit def matrixColProduct[RowR, Common, ValT](implicit ring: Ring[ValT]) :
+ MatrixProduct[Matrix[RowR, Common, ValT], ColVector[Common, ValT], ColVector[RowR, ValT]] =
+ new MatrixProduct[Matrix[RowR, Common, ValT], ColVector[Common, ValT], ColVector[RowR, ValT]] {
+ def apply(left: Matrix[RowR, Common, ValT], right: ColVector[Common, ValT]) = {
+ (left * right.toMatrix(true)).getCol(true)
+ }
+ }
+
+
+
}
@@ -1076,6 +1076,29 @@ class MkStringToListTest extends Specification with TupleConversions with FieldC
}
}
+class InsertJob(args : Args) extends Job(args) {
+ Tsv("input", ('x, 'y)).insert('z, 1).write(Tsv("output"))
+}
+
+class InsertJobTest extends Specification {
+ import Dsl._
+ noDetailedDiffs()
+
+ val input = List((2,2), (3,3))
+
+ "An InsertJob" should {
+ JobTest("com.twitter.scalding.InsertJob")
+ .source(Tsv("input", ('x, 'y)), input)
+ .sink[(Int, Int, Int)](Tsv("output")) { outBuf =>
+ "Correctly insert a constant" in {
+ outBuf.toSet must be_==(Set((2,2,1), (3,3,1)))
+ }
+ }
+ .run
+ .finish
+ }
+}
+
class FoldJob(args : Args) extends Job(args) {
import scala.collection.mutable.{Set => MSet}
Tsv("input", ('x,'y)).groupBy('x) {
@@ -155,6 +155,79 @@ class MatrixMapWithVal(args: Args) extends Job(args) {
row.mapWithIndex { (v,c) => if (c == 0) v else 0.0 }.write(Tsv("first"))
}
+class RowMatProd(args : Args) extends Job(args) {
+
+ import Matrix._
+
+ val p1 = Tsv("mat1",('x1,'y1,'v1)).read
+ val mat1 = new Matrix[Int,Int,Double]('x1,'y1,'v1, p1)
+
+ val row = mat1.getRow(1)
+ val rowProd = row * mat1
+ rowProd.pipe.write(Tsv("rowMatPrd"))
+}
+
+class MatColProd(args : Args) extends Job(args) {
+
+ import Matrix._
+
+ val p1 = Tsv("mat1",('x1,'y1,'v1)).read
+ val mat1 = new Matrix[Int,Int,Double]('x1,'y1,'v1, p1)
+
+ val col = mat1.getCol(1)
+ val colProd = mat1 * col
+ colProd.pipe.write(Tsv("matColPrd"))
+}
+
+class RowRowSum(args : Args) extends Job(args) {
+
+ import Matrix._
+
+ val p1 = Tsv("mat1",('x1,'y1,'v1)).read
+ val mat1 = new Matrix[Int,Int,Double]('x1,'y1,'v1, p1)
+
+ val row1 = mat1.getRow(1)
+ val rowSum = row1 + row1
+ rowSum.pipe.write(Tsv("rowRowSum"))
+}
+
+class RowRowDiff(args : Args) extends Job(args) {
+
+ import Matrix._
+
+ val p1 = Tsv("mat1",('x1,'y1,'v1)).read
+ val mat1 = new Matrix[Int,Int,Double]('x1,'y1,'v1, p1)
+
+ val row1 = mat1.getRow(1)
+ val row2 = mat1.getRow(2)
+ val rowSum = row1 - row2
+ rowSum.pipe.write(Tsv("rowRowDiff"))
+}
+
+class RowRowHad(args : Args) extends Job(args) {
+
+ import Matrix._
+
+ val p1 = Tsv("mat1",('x1,'y1,'v1)).read
+ val mat1 = new Matrix[Int,Int,Double]('x1,'y1,'v1, p1)
+
+ val row1 = mat1.getRow(1)
+ val rowSum = row1 hProd row1
+ rowSum.pipe.write(Tsv("rowRowHad"))
+}
+
+class VctOuterProd(args : Args) extends Job(args) {
+
+ import Matrix._
+
+ val p1 = Tsv("mat1",('x1,'y1,'v1)).read
+ val mat1 = new Matrix[Int,Int,Double]('x1,'y1,'v1, p1)
+
+ val row1 = mat1.getRow(1)
+ val outerProd = row1.transpose * row1
+ outerProd.pipe.write(Tsv("outerProd"))
+}
+
class MatrixTest extends Specification {
noDetailedDiffs() // For scala 2.9
import Dsl._
@@ -418,4 +491,81 @@ class MatrixTest extends Specification {
.run
.finish
}
+
+ "A Matrix RowMatProd job" should {
+ TUtil.printStack {
+ JobTest("com.twitter.scalding.mathematics.RowMatProd")
+ .source(Tsv("mat1",('x1,'y1,'v1)), List((1,1,1.0),(2,2,3.0),(1,2,4.0)))
+ .sink[(Int,Double)](Tsv("rowMatPrd")) { ob =>
+ "correctly compute a new row vector" in {
+ val pMap = oneDtoSparseMat(ob)
+ pMap must be_==( Map((1,1)->1.0, (2,2)->16.0) )
+ }
+ }
+ .run
+ .finish
+ }
+ }
+
+ "A Matrix MatColProd job" should {
+ TUtil.printStack {
+ JobTest("com.twitter.scalding.mathematics.MatColProd")
+ .source(Tsv("mat1",('x1,'y1,'v1)), List((1,1,1.0),(2,2,3.0),(1,2,4.0)))
+ .sink[(Int,Double)](Tsv("matColPrd")) { ob =>
+ "correctly compute a new column vector" in {
+ val pMap = oneDtoSparseMat(ob)
+ pMap must be_==( Map((1,1)->1.0) )
+ }
+ }
+ .run
+ .finish
+ }
+ }
+
+ "A Matrix RowRowSum job" should {
+ TUtil.printStack {
+ JobTest("com.twitter.scalding.mathematics.RowRowSum")
+ .source(Tsv("mat1",('x1,'y1,'v1)), List((1,1,1.0),(2,2,3.0),(1,2,4.0)))
+ .sink[(Int,Double)](Tsv("rowRowSum")) { ob =>
+ "correctly add row vectors" in {
+ val pMap = oneDtoSparseMat(ob)
+ pMap must be_==( Map((1,1)->2.0, (2,2)->8.0) )
+ }
+ }
+ .run
+ .finish
+ }
+ }
+
+ "A Matrix RowRowDiff job" should {
+ TUtil.printStack {
+ JobTest("com.twitter.scalding.mathematics.RowRowDiff")
+ .source(Tsv("mat1",('x1,'y1,'v1)), List((1,1,1.0),(2,2,3.0),(1,2,4.0)))
+ .sink[(Int,Double)](Tsv("rowRowDiff")) { ob =>
+ "correctly subtract row vectors" in {
+ val pMap = oneDtoSparseMat(ob)
+ pMap must be_==( Map((1,1)->1.0, (2,2)->1.0) )
+ }
+ }
+ .run
+ .finish
+ }
+ }
+
+ "A Matrix VctOuterProd job" should {
+ TUtil.printStack {
+ JobTest("com.twitter.scalding.mathematics.VctOuterProd")
+ .source(Tsv("mat1",('x1,'y1,'v1)), List((1,1,1.0),(2,2,3.0),(1,2,4.0)))
+ .sink[(Int,Int,Double)](Tsv("outerProd")) { ob =>
+ "correctly compute the outer product of a column and row vector" in {
+ val pMap = toSparseMat(ob)
+ pMap must be_==( Map((1,1)->1.0, (1,2)->4.0, (2,1) -> 4.0, (2,2)->16.0) )
+ }
+ }
+ .run
+ .finish
+ }
+ }
+
+
}

0 comments on commit 37d8e46

Please sign in to comment.