Skip to content

Commit

Permalink
make hyperLogLogMap public with configurable serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Siegel committed Dec 13, 2012
1 parent 8a9b957 commit b38466d
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions src/main/scala/com/twitter/scalding/ReduceOperations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,23 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ
* }}}
*/
def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
derivedHll(f, errPercent) { (monoid,hll) => monoid.estimateSize(hll) }
hyperLogLogMap(f, errPercent) { (monoid,hll) => monoid.estimateSize(hll) }
{ _.toString.getBytes("UTF-8") }
}

def hyperLogLog(f : (Fields, Fields), errPercent : Double = 1.0) = {
derivedHll(f, errPercent) { (_,hll) => hll }
hyperLogLogMap(f, errPercent) { (_,hll) => hll }
{ _.toString.getBytes("UTF-8") }
}

private[this] def derivedHll[T](f : (Fields, Fields), errPercent : Double)
(fn : (HyperLogLogMonoid,HLL) => T) = {
def hyperLogLogMap[T,U](f : (Fields, Fields), errPercent : Double = 1.0)
(fn : (HyperLogLogMonoid,HLL) => U)
(toBytes : (T) => Array[Byte]) = {
//bits = log(m) == 2 *log(104/errPercent) = 2log(104) - 2*log(errPercent)
def log2(x : Double) = scala.math.log(x)/scala.math.log(2.0)
val bits = 2 * scala.math.ceil(log2(104) - log2(errPercent)).toInt
implicit val hmm = new HyperLogLogMonoid(bits)
mapPlusMap(f) { (in : CTuple) => hmm.create(in.toString.getBytes("UTF-8")) }
mapPlusMap(f) { (t : T) => hmm.create(toBytes(t)) }
{ hll => fn(hmm,hll) }
}

Expand Down

0 comments on commit b38466d

Please sign in to comment.