## eDSL B: Dataframes for big-data processing
#### Lef Ioannidis (@elefthei) : Investment Engineer, Bridgewater Associates

The PeriodSeries eDSL allows us to do statistics with a high-degree of safety as well as provenance. 

But what if we want to operate on hundreds of PeriodSeries, while maintaining both safety and provenance.

In [None]:
import scala.collection.immutable.HashMap

In [None]:
type Symbol = String
val stocks: HashMap[Symbol, Double] = HashMap(
    "AAPL" -> 180.26,
    "ABC" -> 200.00,
    "VIX" -> 48, 
    "SPY" -> 300)

In [None]:
object StockType extends Enumeration {
  type Inner = Value
  // Can be one of the following
  val Index,Technology,Finance = Value
  def fromString(s: String): Value = s match {
    case "AAPL" => Technology
    case "ABC" => Technology
    case "VIX" => Index
    case "SPY" => Index
  }
}
type StockType = StockType.Inner

## GroupBy, map etc.

In [None]:
stocks.groupBy { case (k,v) => StockType.fromString(k) }

In [None]:
stocks.mapValues(_ * 2)

## Joins
### We also want to be able to join Maps a-la SQL join.

In [None]:
implicit class JoinableMaps[F, U](left: HashMap[F, U]) {
    def join[V](right: HashMap[F, V]): HashMap[F, (U, V)] =
     new HashMap[F, (U, V)]() ++ left.keySet.union(right.keySet)
        .map(k =>
          (k, (left.getOrElse(k, throw new IllegalArgumentException(s"Left field $k not found")),
            right.getOrElse(k, throw new IllegalArgumentException(s"Right field $k not found")))))
        .toMap
}

In [None]:
val costs = HashMap(
    "AAPL" -> 500e6,
    "ABC" -> 200e6)

val earnings = HashMap(
    "AAPL" -> 800e6,
    "ABC" -> 500e6)

In [None]:
val returns = earnings.join(costs).mapValues { case (left, right) => left - right }

## This is by-and-large, the feel of our dataframe language.

### Requirements
1. General enough to do everything.
2. Execution; maybe on local machine maybe on a Map-reduce cluster.
3. Provenance; we saw automatic provenance, how about manual?
4. Lazyness

## Same trick
### Reify higher-order syntax & interpret

```
LMap[F, U] = 
  Single(inner: HashMap[F, U])
   | Provenance(description: String, fa: LMap[F, U])
   | Map[G,B](fa: LMap[F, U], f: (F, U) => (G, B))
   | FlatMap[G,B](fa: LMap[F, U], f: (F, U) => LMap[G, B])
   | Fold[G,B](fa: LMap[F, U], z: LMap[G, B], f: LMap[G, B] => (F, U) => LMap[G, B])
   | Join[B,C](l: LMap[F, B], r: LMap[F, C])
```

In [None]:
  sealed trait LMap[F, U] extends Product with Serializable {
    // Implement these
    def run: HashMap[F, U]
    def map[H, I](f: (F, U) => (H, I)): LMap[H, I]
    def flatMap[H, I](f: (F, U) => LMap[H, I]): LMap[H, I]
    def foldLeft[G, B](z: LMap[G, B], f: LMap[G, B] => (F, U) => LMap[G, B]): LMap[G, B]
    def join[C](r: LMap[F, C]): LMap[F, (U, C)]
    def trace(): String // for provenance

    // Derived functions
    def empty[H, I](): LMap[H, I] = HashMap[H, I]().reify

    def collect[H, I](f: (F, U) PartialFunction (H, I)): LMap[H, I] =
      flatMap((k: F, v: U) => f.lift(k, v) match {
        case None => empty[H, I]()
        case Some(out) => LMap(out._1 -> out._2)
      })

    def filter(f: (F, U) => Boolean): LMap[F, U] =
      collect { case (k, v) if f(k, v) => (k, v) }

    // Add provenance interleaved with ops
    def provenance(op: String): LMap[F, U] =
      Provenance(op, this)

    def groupBy[G](f: F => G): LMap[G, LMap[F, U]] =
      map { case (k, _) => (f(k), this) } // HashMap will drop duplicates

    // Z can be anything, including a primitive type. Evaluate
    def foldLeft[Z](z: Z, f: (Z, (F, U)) => Z): Z =
      run.foldLeft(z)(f)

    def foldRight[Z](z: Z, f: ((F, U), Z) => Z): Z =
      run.foldRight(z)(f)
  }

  // Companion object
  object LMap {
    def apply[H, I](args: (H, I)*): LMap[H, I] =
      HashMap[H, I](args: _*).reify
  }

  // Reified Map
  final case class Map[F, G, U, B](fa: LMap[F, U], f: (F, U) => (G, B)) extends LMap[G, B] {
    override def run: HashMap[G, B] = fa.run.map { case (k, v) => f(k, v) }

    // Optimization!
    override def map[H, I](g: (G, B) => (H, I)): LMap[H, I] =
      Map(fa, (a: F, b: U) => {
        val (x, y) = f(a, b) // Compose f . g
        g(x, y)
      })

    override def join[V](r: LMap[G, V]): LMap[G, (B, V)] =
      Join(this, r)

    override def flatMap[H, I](g: (G, B) => LMap[H, I]): LMap[H, I] =
      FlatMap(this, g)

    override def foldLeft[H, I](z: LMap[H, I], f: LMap[H, I] => (G, B) => LMap[H, I]): LMap[H, I] =
      Fold(this, z, f)

    // Only print provenance
    override def trace(): String = fa.trace
  }

  // Reified FlatMap
  final case class FlatMap[F, U, G, B](fa: LMap[F, U], f: (F, U) => LMap[G, B]) extends LMap[G, B] {
    override def run: HashMap[G, B] = fa.run.flatMap { case (k, v) => f(k, v).run.toIterable }

    override def map[H, I](g: (G, B) => (H, I)): LMap[H, I] =
      Map(this, g)

    override def join[V](r: LMap[G, V]): LMap[G, (B, V)] =
      Join(this, r)

    // Optimization
    override def flatMap[H, I](g: (G, B) => LMap[H, I]): LMap[H, I] =
      FlatMap(fa, (k: F, v: U) => f(k, v).flatMap(g))

    override def foldLeft[H, I](z: LMap[H, I], f: LMap[H, I] => (G, B) => LMap[H, I]): LMap[H, I] =
      Fold(this, z, f)

    // Only print provenance
    override def trace(): String = fa.trace
  }

  // Reified Fold
  final case class Fold[F, U, G, B](fa: LMap[F, U], z: LMap[G, B], f: LMap[G, B] => (F, U) => LMap[G, B]) extends LMap[G, B] {
    override def run: HashMap[G, B] = fa.run.foldLeft(z) { case (h, (k, v)) => f(h)(k, v) }.run

    override def map[H, I](g: (G, B) => (H, I)): LMap[H, I] =
      Map(this, g)

    override def join[V](r: LMap[G, V]): LMap[G, (B, V)] =
      Join(this, r)

    override def flatMap[H, I](g: (G, B) => LMap[H, I]): LMap[H, I] =
      FlatMap(this, g)

    override def foldLeft[H, I](s: LMap[H, I], g: LMap[H, I] => (G, B) => LMap[H, I]): LMap[H, I] =
      Fold(this, s, g)

    // Only print provenance
    override def trace(): String = fa.trace
  }

  // Join expressions
  final case class Join[F, B, C](l: LMap[F, B], r: LMap[F, C]) extends LMap[F, (B, C)] {
    override def run: HashMap[F, (B, C)] = {
      // Real join implementation, pretty inefficient but succinct
      val left: HashMap[F, B] = l.run
      val right: HashMap[F, C] = r.run

      // Annoying way to initialize a HashMap
      new HashMap[F, (B, C)]() ++ left.keySet.union(right.keySet)
        .map(k =>
          (k, (left.getOrElse(k, throw new IllegalArgumentException(s"Left field $k not found")),
            right.getOrElse(k, throw new IllegalArgumentException(s"Right field $k not found")))))
        .toMap
    }

    override def map[H, I](g: (F, (B, C)) => (H, I)): LMap[H, I] =
      Map(this, g)

    override def join[D](r: LMap[F, D]): LMap[F, ((B, C), D)] =
      Join(this, r)

    override def flatMap[H, I](g: (F, (B, C)) => LMap[H, I]): LMap[H, I] =
      FlatMap(this, g)

    override def foldLeft[H, I](z: LMap[H, I], f: LMap[H, I] => (F, (B, C)) => LMap[H, I]): LMap[H, I] =
      Fold(this, z, f)

    // Print provenance
    override def trace(): String = "JOIN {\n" ++ l.trace() ++ ",\n" ++ r.trace() ++ "}\n"
  }

  // Interleave provenance with computation
  final case class Provenance[F, U](op: String, fa: LMap[F, U]) extends LMap[F, U] {
    override def run: HashMap[F, U] = fa.run
    override def map[H, I](f: (F, U) => (H, I)): LMap[H, I] =
      Map(this, f)

    override def flatMap[H, I](f: (F, U) => LMap[H, I]): LMap[H, I] =
      FlatMap(this, f)

    override def foldLeft[G, B](z: LMap[G, B], f: LMap[G, B] => (F, U) => LMap[G, B]): LMap[G, B] =
      Provenance(op, fa.foldLeft(z, f))

    override def join[C](r: LMap[F, C]): LMap[F, (U, C)] =
      Provenance(op, fa.join(r))

    override def trace(): String = fa.trace() ++ op ++ "\n" ++ fa.run.toString() ++ "\n\n"
  }

  // Algebra entry point
  final case class Single[F, B](in: HashMap[F, B]) extends LMap[F, B] {
    override def run: HashMap[F, B] = in

    override def map[H, I](g: (F, B) => (H, I)): LMap[H, I] =
      Map(this, g)

    // If I had my trusty MultiJoin3 here it would be piece of cake
    override def join[C](r: LMap[F, C]): LMap[F, (B, C)] =
      Join(this, r)

    override def flatMap[H, I](g: (F, B) => LMap[H, I]): LMap[H, I] =
      FlatMap(this, g)

    override def foldLeft[H, I](z: LMap[H, I], f: LMap[H, I] => (F, B) => LMap[H, I]): LMap[H, I] =
      Fold(this, z, f)

    override def trace(): String = ""
  }

  // Lift HashMap to Lazy Map
  implicit class LazyOps[F, U](h: HashMap[F, U]) {
    def reify: LMap[F, U] = Single(h)
  }

### Alternative approach to provenance

In [None]:
  val lazystocks = 
    stocks.reify
      .provenance("Initial stock valuation")
      .map { case (k, v) => (k, 2 * v) }
      .flatMap { case (k, v) => LMap((k, "up") -> -v, (k, "down") -> v) }
      .provenance("Doubled & Showed directionality")
  lazystocks.run

In [None]:
  val finalstocks = 
    lazystocks
      .collect { case (k, v) if v < 0 => (k._1, v / 2) }
      .map { case (k, v) => (k, -v) }
      .provenance("Return to the initial portfolio")

In [None]:
finalstocks.run

## Provenance; documentation interleaved with computation

In [None]:
finalstocks.trace

## Execution
### The eDSL gives us an object representation of logic. Can optimize, can codegen or even generate an Apache Spark Context.
## Related: [twitter/scalding](https://github.com/twitter/scalding)

## Questions?