diff --git a/src/main/scala/com/twitter/scalding/mathematics/Combinatorics.scala b/src/main/scala/com/twitter/scalding/mathematics/Combinatorics.scala new file mode 100644 index 0000000000..b233e5e5d1 --- /dev/null +++ b/src/main/scala/com/twitter/scalding/mathematics/Combinatorics.scala @@ -0,0 +1,227 @@ +package com.twitter.scalding.mathematics +import com.twitter.scalding._ +import com.twitter.scalding.Dsl._ +import cascading.flow.FlowDef +import cascading.tuple.{Fields, TupleEntry} +import cascading.pipe.Pipe + +/** +Serve as a repo for self-contained combinatorial functions with no dependencies +such as +combinations, aka n choose k, nCk +permutations , aka nPk +subset sum : numbers that add up to a finite sum +weightedSum: For weights (a,b,c, ...), want integers (x,y,z,...) to satisfy constraint |ax + by + cz + ... - result | < error +... + +@author : Krishnan Raman, kraman@twitter.com +*/ + +object Combinatorics { + +/** + Given an int k, and an input of size n, + return a pipe with nCk combinations, with k columns per row + + + Computes nCk = n choose k, for large values of nCk + + Use-case: Say you have 100 hashtags sitting in an array + You want a table with 5 hashtags per row, all possible combinations + If the hashtags are sitting in a string array, then + combinations[String]( hashtags, 5) + will create the 100 chose 5 combinations. + + Algorithm: Use k pipes, cross pipes two at a time, filter out non-monotonic entries + + eg. 10C2 = 10 choose 2 + Use 2 pipes. + Pipe1 = (1,2,3,...10) + Pipe2 = (2,3,4....10) + Cross Pipe1 with Pipe2 for 10*9 = 90 tuples + Filter out tuples that are non-monotonic + For (t1,t2) we want t1 Symbol("n"+x)) // all column names + + val pipes = allc.zipWithIndex.map( x=> { + val num = x._2 + 1 + val pipe = IterableSource( (num to n), x._1 ).read + (pipe, num) + }) + + val res = pipes.reduceLeft( (a,b) => { + val num = b._2 + val prevname = Symbol("n" + (num - 1)) + val myname = Symbol( "n" + num) + val mypipe = a._1 + .crossWithSmaller(b._1) + .filter( prevname, myname ){ + foo:(Int, Int) => + val( nn1, nn2) = foo + nn1 < nn2 + } + (mypipe, -1) + })._1 + + (1 to k).foldLeft(res)((a,b)=>{ + val myname = Symbol( "n" + b) + val newname = Symbol("k" + b) + a.map(myname->newname){ + inpc:Int => input(inpc-1) + }.discard(myname) + }) + + } + + /** + Return a pipe with all nCk combinations, with k columns per row + */ + def combinations(n:Int, k:Int)(implicit flowDef:FlowDef) = combinations[Int]((1 to n).toArray, k) + + /** + Return a pipe with all nPk permutations, with k columns per row + For details, see combinations(...) above + */ + + + + def permutations[T](input:IndexedSeq[T], k:Int)(implicit flowDef:FlowDef):Pipe = { + + val n = input.size + val allc = (1 to k).toList.map( x=> Symbol("n"+x)) // all column names + + val pipes = allc.map( x=> IterableSource(1 to n, x).read) + + // on a given row, we cannot have duplicate columns in a permutation + val res = pipes + .reduceLeft( (a,b) => { a.crossWithSmaller(b) }) + .filter( allc ) { + x: TupleEntry => Boolean + val values = (0 until allc.size).map( i=> x.getInteger( i.asInstanceOf[java.lang.Integer])) + values.size == values.distinct.size + } + + // map numerals to actual data + (1 to k).foldLeft(res)((a,b)=>{ + val myname = Symbol( "n" + b) + val newname = Symbol("k" + b) + a.map(myname->newname){ + inpc:Int => input(inpc-1) + }.discard(myname) + }) + + } + + /** + Return a pipe with all nPk permutations, with k columns per row + */ + def permutations(n:Int, k:Int)(implicit flowDef:FlowDef) = permutations[Int]((1 to n).toArray, k) + + + /** + Goal: Given weights (a,b,c, ...), we seek integers (x,y,z,...) to satisft + the constraint |ax + by + cz + ... - result | < error + + Parameters: The weights (a,b,c,...) must be non-negative doubles. + Our search space is 0 to result/min(weights) + The returned pipe will contain integer tuples (x,y,z,...) that satisfy ax+by+cz +... = result + + Note: This is NOT Simplex + WE use a slughtly-improved brute-force algorithm that performs well on account of parallelization. + Algorithm: + Create as many pipes as the number of weights + Each pipe copntains integral multiples of the weight w ie. (0,1w,2w,3w,4w,....) + Iterate as below - + Cross two pipes + Create a temp column that stores intermediate results + Apply progressive filtering on the temp column + Discard the temp column + Once all pipes are crossed, test for temp column within error bounds of result + Discard duplicates at end of process + + Usecase: We'd like to generate all integer tuples for typical usecases like + + 0. How many ways can you invest $1000 in facebook, microsoft, hp ? + val cash = 1000.0 + val error = 5.0 // max error $5, so its ok if we cannot invest the last $5 or less + val (FB, MSFT, HP) = (23.3,27.4,51.2) // share prices + val stocks = IndexedSeq( FB,MSFT,HP ) + weightedSum( stocks, cash, error).write( Tsv("invest.txt")) + + 1. find all (x,y,z) such that 2x+3y+5z = 23, with max error 1 + weightedSum( IndexedSeq(2.0,3.0,5.0), 23.0, 1.0) + + 2. find all (a,b,c,d) such that 2a+12b+12.5c+34.7d = 3490 with max error 3 + weightedSum( IndexedSeq(2.0,12.0,2.5,34.7),3490.0,3.0) + + This is at the heart of portfolio mgmt( Markowitz optimization), subset-sum, operations-research LP problems. + + */ + + def weightedSum( weights:IndexedSeq[Double], result:Double, error:Double)(implicit flowDef:FlowDef):Pipe = { + val numWeights = weights.size + val allColumns = (1 to numWeights).map( x=> Symbol("k"+x)) + + // create as many single-column pipes as the number of weights + val pipes = allColumns.zip(weights).map( x=> { + val (name,wt) = x + IterableSource( (0.0 to result by wt), name).read + }).zip( allColumns ) + + val first = pipes.head + val accum = (first._1, List[Symbol](first._2)) + val rest = pipes.tail + + val res = rest.foldLeft(accum)((a,b)=>{ + + val (apipe, aname) = a + val (bpipe, bname) = b + val allc = (List(aname)).flatten ++ List[Symbol](bname) + + // Algorithm: + // Cross two pipes + // Create a temp column that stores intermediate results + // Apply progressive filtering on the temp column + // Discard the temp column + // Once all pipes are crossed, test for temp column within error bounds of result + // Discard duplicates at end of process + + ( apipe.crossWithSmaller(bpipe) + .map(allc->'temp){ + x:TupleEntry => + val values = (0 until allc.size).map( i=> x.getDouble( i.asInstanceOf[java.lang.Integer])) + values.sum + }.filter('temp){ + x:Double => if( allc.size == numWeights) (math.abs(x-result)<= error) else (x <= result) + }.discard('temp), allc ) + })._1.unique(allColumns) + + (1 to numWeights).zip(weights).foldLeft( res) ((a,b) => { + val (num,wt) = b + val myname = Symbol("k"+num) + a.map( myname->myname){ x:Int => (x/wt).toInt } + }) + } + + /** + Does the exact same thing as weightedSum, but filters out tuples with a weight of 0 + The returned pipe contain only positive non-zero weights. + */ + def positiveWeightedSum( weights:IndexedSeq[Double], result:Double, error:Double)(implicit flowDef:FlowDef):Pipe = { + val allColumns = (1 to weights.size).map( x=> Symbol("k"+x)) + weightedSum( weights, result, error).filter( allColumns ){ + x:TupleEntry => (0 until allColumns.size).map( i=> x.getDouble(i.asInstanceOf[java.lang.Integer])!=0.0).reduceLeft(_&&_) + } + } + + +} diff --git a/src/test/scala/com/twitter/scalding/mathematics/CombinatoricsTest.scala b/src/test/scala/com/twitter/scalding/mathematics/CombinatoricsTest.scala new file mode 100644 index 0000000000..7aaa8f6873 --- /dev/null +++ b/src/test/scala/com/twitter/scalding/mathematics/CombinatoricsTest.scala @@ -0,0 +1,57 @@ +package com.twitter.scalding.mathematics + +import org.specs._ +import com.twitter.scalding._ + +class CombinatoricsJob(args : Args) extends Job(args) { + val C = Combinatorics + C.permutations( 10,3 ).write(Tsv("perms.txt")) + + C.combinations( 5,2 ).write(Tsv("combs.txt")) + + // how many ways can you invest $10000 in KR,ABT,DLTR,MNST ? + val cash = 1000.0 + val error = 1.0 // max error $1, so its ok if we cannot invest the last dollar + val (kr,abt,dltr,mnst) = (27.0,64.0,41.0,52.0) // share prices + val stocks = IndexedSeq( kr,abt,dltr,mnst) + + + C.weightedSum( stocks, cash,error).write( Tsv("invest.txt")) + C.positiveWeightedSum( stocks, cash,error).write( Tsv("investpos.txt")) + +} + +class CombinatoricsJobTest extends Specification { + noDetailedDiffs() + import Dsl._ + + "A Combinatorics Job" should { + JobTest( new CombinatoricsJob(_)) + .sink[(Int,Int)](Tsv("perms.txt")) { pbuf => + val psize = pbuf.toList.size + "correctly compute 10 permute 3 equals 720" in { + psize must be_==(720) + } + } + .sink[(Int,Int)](Tsv("combs.txt")) { buf => + val csize = buf.toList.size + "correctly compute 5 choose 2 equals 10" in { + csize must be_==(10) + } + } + .sink[(Int,Int,Int,Int)](Tsv("invest.txt")) { buf => + val isize = buf.toList.size + "correctly compute 169 tuples that allow you to invest $1000 among the 4 given stocks" in { + isize must be_==(169) + } + } + .sink[(Int,Int,Int,Int)](Tsv("investpos.txt")) { buf => + val ipsize = buf.toList.size + "correctly compute 101 non-zero tuples that allow you to invest $1000 among the 4 given stocks" in { + ipsize must be_==(101) + } + } + .run + .finish + } +}