# Emma Programming Guide

## Preamble

Make sure you have build the *emma-tutorial* project with

```
mvn compile
```

before running the commands below.

### REPL Setup

Uncomment the following code if you want to include a manually deployed version of Emma.

In [None]:
// interp.resolvers() = interp.resolvers() :+ ammonite.runtime.tools.Resolver.File(
//   "m2",
//   "/.m2/repository",
//   "/[organisation]/[module]/[revision]/[artifact]-[revision].[ext]",
//   m2 = true
// )

In [1]:
import ammonite.ops._
import java.nio.file.Paths

import $ivy.`org.emmalanguage:emma-language:0.2-rc2`
// `provided` dependencies expected by `emma-language`
import $ivy.`com.chuusai::shapeless:2.3.2`
import $ivy.`org.apache.hadoop:hadoop-common:2.8.0`
import $ivy.`org.apache.hadoop:hadoop-hdfs:2.8.0`
// compiler plugin required for `@emma.lib` annotation
import $plugin.$ivy.`org.scalamacros:paradise_2.11.8:2.0.1`

// project classpath
interp.load.cp(pwd / up / "emma-tutorial-library" / "target" / "classes")

[32mimport [39m[36mammonite.ops._
[39m
[32mimport [39m[36mjava.nio.file.Paths

[39m
[32mimport [39m[36m$ivy.$                                       
// `provided` dependencies expected by `emma-language`
[39m
[32mimport [39m[36m$ivy.$                             
[39m
[32mimport [39m[36m$ivy.$                                      
[39m
[32mimport [39m[36m$ivy.$                                    
// compiler plugin required for `@emma.lib` annotation
[39m
[32mimport [39m[36m$plugin.$                                           

// project classpath
[39m

### Example Data Setup

The following will download the [https://openflights.org]() data and import the data model (`data.openflights`) and some auxiliary functions (`lib.openflights`) used in the following examples. 

In [2]:
// project imports
import org.example.emma.tutorial.data.openflights._
import org.example.emma.tutorial.lib.openflights._

// download https://openflights.org data
downloadOpenFlightsData()

[32mimport [39m[36morg.example.emma.tutorial.data.openflights._
[39m
[32mimport [39m[36morg.example.emma.tutorial.lib.openflights._

// download https://openflights.org data
[39m

## Basic API

Emma programs require the following import.

In [3]:
import org.emmalanguage.api._

[32mimport [39m[36morg.emmalanguage.api._[39m

### DataBag Basics

Parallel computation in Emma is represented by expressions over a generic type `DataBag[A]`. 
The type represents a distributed collection of elements of type `A` that do not have particular order and may contain duplicates.

`DataBag` instances are created directly from a Scala `Seq`

In [None]:
val squaresSeq = 1 to 42 map { x => (x, x * x) }
val squaresBag = DataBag(squaresSeq)

or by reading from a supported source, e.g. `CSV` or `Parquet`.

In [None]:
val csv = CSV(delimiter = ',')
val airports = DataBag.readCSV[Airport](file("airports.dat").toString, csv)
val airlines = DataBag.readCSV[Airline](file("airlines.dat").toString, csv)
val routes = DataBag.readCSV[Route](file("routes.dat").toString, csv)

Conversely, a `DataBag` can be converted back to a `Seq`

In [None]:
val squaresSeq = squaresBag.collect()

or written to a supported file system.

In [None]:
airports.writeCSV(file("airports.copy.dat").toString, csv)
airlines.writeCSV(file("airlines.copy.dat").toString, csv)
routes.writeCSV(file("routes.copy.dat").toString, csv)

### Declarative Dataflows

In contrast to other distributed collection types such as Spark's `RDD` and Flink's `DataSet`, Emma's `DataBag` type is a proper monad. 
This means that joins and cross products in Emma can be declared using `for`-comprehension syntax in a manner akin to *Select-From-Where* expressions known from SQL.

In [None]:
val flightsFromBerlin = for {
  al <- airlines
  if al.name == "Air Berlin"
  ap <- airports 
  if ap.city == Some("Berlin")
  rt <- routes
  if rt.airlineID == Some(al.id)
  if rt.srcID == Some(ap.id)
} yield (rt.src, rt.dst, rt.isShared)

flightsFromBerlin.collect().take(10)

In addition to comprehension syntax, the `DataBag` interface offers some combinators.
You can combine two `DataBag[A]` instances by taking their (duplicate preserving) `union` (corresponds to `UNION ALL` clause in SQL).

In [None]:
val srcs = routes.map(_.src) 
val dsts = routes.map(_.dst)
val locs = srcs union dsts

You can eliminate duplicates with `distinct` (corresponds to the `DISTINCT` clause in SQL).

In [None]:
val dupls = locs.collect().size
val dists = locs.distinct.collect().size

You can sample `N` elemens using [reservoir sampling](https://en.wikipedia.org/wiki/Reservoir_sampling).

In [None]:
val sample = routes.map(_.src).sample(5)

You can extend all elements in a `DataBag` with a unique index.

In [None]:
val indexed = routes.map(_.src).zipWithIndex.sample(5)

### Folds

The core parallel processing abstraction provided by `DataBag[A]` is a generic pattern for parallel
collection processing called *structural recursion*.   

To understand how structural recursion works, assume for a moment that `DataBag[A]` instances can be (conceptually) constructed in one of three ways: the `Empty` bag, a singleton bag `Singleton(x)`, or the union of two existing bags `Union(xs, ys)`. *Structural recursion* works on bags by

1. systematically deconstructing the input `DataBag[A]` instance, 
2. replacing the constructors with corresponding user-defined functions, and 
3. evaluating the resulting expression.

Formally, the above procedure can be specified second-order function called `fold` as follows.

```scala
def fold[B](zero: B)(init: A => B, plus: (B, B) => B): B = this match {
  case Empty         => zero
  case Singleton(x)  => init(x)
  case Union(xs, ys) => plus(xs.fold(e)(s, u), ys.fold(e)(s, u))
}
```

Note how `Empty` constructors are substituted by `zero` applications, `Singleton(x)` constructors are substituted by `init(x)`, and `Union(xs, ys)` by `plus(xs, ys)`.

A particular combination of `zero`, `init`, and `plus` function therefore defines a specific function. For example,

In [None]:
val dupls = locs.fold(0L)(_ => 1L, _ + _)

is another way to compute the number of elements of `dupls`. Note that this expression will be evaluated **in parallel**, while the version we used above

In [None]:
val dupls = locs.collect().size

first converts the `DataBag` **dupls** into a `Seq[String]` and count the number of elements locally.

A convenient way to bundle a specific combination of functions passed to a `fold` is through a dedicated trait.

```scala
// defined in `org.emmalanguage.api.alg`
trait Alg[-A, B] extends Serializable {
  val zero: B
  val init: A => B
  val plus: (B, B) => B
}
```

and overload `fold` as follows:

```scala
def fold[B](zero: B)(init: A => B, plus: (B, B) => B): B = this match {
  case Empty         => zero
  case Singleton(x)  => init(x)
  case Union(xs, ys) => plus(xs.fold(e)(s, u), ys.fold(e)(s, u))
}
```

Now, we can name commonly used triples as specific `Alg` instances.

```scala
object Size extends Alg[Any, Long] {
  val zero: Long = 0
  val init: Any => Long = const(1)
  val plus: (Long, Long) => Long = _ + _
}
```

and define corresponding aliases for the corresponding `fold(alg)` calls.

```scala
def size: Long = this.fold(Size)
```

The following `DataBag` methods are actually defined as specific folds.

In [None]:
// cardinality tests
locs.size      // counts the number of elements
locs.nonEmpty  // checks if empty
locs.isEmpty   // checks if not empty

In [None]:
// based on an implicit `Ordering`
locs.min       // minimum
locs.max       // maximum
locs.top(3)    // top-K
locs.bottom(3) // bottom-K

In [None]:
// arithmetic operations
routes.map(_.stops).sum
DataBag(1 to 5).product

In [None]:
// predicate testing
routes.count(_.stops < 3)
routes.forall(_.stops < 3)
routes.exists(r => r.src == "FRA" && r.dst == "SFO")
routes.find(r => r.src == "FRA" && r.dst == "SFO")

In [None]:
// reducers
DataBag(1 to 5).reduce(1)(_ * _)
DataBag(1 to 5).reduceOption(_ * _)
DataBag(Seq.empty[Int]).reduceOption(_ * _)

## Code Parallelisation

To parallelise Emma code, you need to do two things

1. Setup a parallel dataflow framework (Flink or Spark)
2. set the `emmaQuote` value to the respective Emma code fragment `emma.onSpark` or `emma.onFlink`. Of course you can also inline `emmaQuote` and use the fragments directly.

### Parallel Dataflow Framework Setup

Run only one of the two options below.

#### Option 1: Spark

In [4]:
// `emma.onSpark` macro
import $ivy.`org.emmalanguage:emma-spark:0.2-rc2`
// `provided` dependencies expected by `emma-spark`
import $ivy.`org.apache.spark::spark-sql:2.1.0`

[32mimport [39m[36m$ivy.$                                    
// `provided` dependencies expected by `emma-spark`
[39m
[32mimport [39m[36m$ivy.$                                  [39m

In [5]:
// required spark imports
import org.apache.spark.sql.SparkSession

[32mimport [39m[36morg.apache.spark.sql.SparkSession[39m

In [6]:
// implicit Spark environment
implicit val sparkSession = SparkSession.builder()
    .appName("Emma Programming Guide")
    .master("local[*]")
    .config("spark.sql.warehouse.dir", Paths.get(sys.props("java.io.tmpdir"), "spark-warehouse").toUri.toString)
    .getOrCreate()

[36msparkSession[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@56525de

In [7]:
val emmaQuote = emma.onSpark

[36memmaQuote[39m: [32memma[39m.[32monSpark[39m.type = org.emmalanguage.api.emma.onSpark$@5d6f7106

#### Option 2: Flink

In [None]:
import $ivy.`org.emmalanguage:emma-flink:0.2-rc2`
// import $ivy.`org.apache.spark::spark-sql:2.1.0`
import $ivy.`org.apache.flink::flink-scala:1.2.1`
import $ivy.`org.apache.flink::flink-clients:1.2.1`

In [None]:
import org.apache.flink.api.scala.ExecutionEnvironment
implicit val env = ExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging()

In [None]:
// Include flink target classes in the classpath
interp.load.cp(ammonite.ops.Path(org.emmalanguage.compiler.RuntimeCompiler.default.codeGenDir))

In [None]:
val emmaQuote = emma.onFlink

### Quotations

The enclosed code block is optimized holistically by the Emma compiler, and `DataBag` expressions are offloaded to the corresponding parallel exeuction engine.

In [8]:
// override the `airports`, `routes`, and `airlines` members
// with the `def`-based versions from `data.openflights` module
import org.example.emma.tutorial.data.openflights._

[32mimport [39m[36morg.example.emma.tutorial.data.openflights._[39m

In [9]:
// evaluates as Spark RDD map and filter
val berlinAirports = emmaQuote {
  for {
    a <- airports
    if a.latitude > 52.3
    if a.latitude < 52.6
    if a.longitude > 13.2
    if a.longitude < 13.7
  } yield Location(
    a.name,
    a.latitude,
    a.longitude)
}

berlinAirports.collect().foreach(println)

Location(Berlin-Schönefeld International Airport,52.380001068115,13.522500038147)
Location(Berlin-Tempelhof International Airport,52.472999572753906,13.403900146484375)
Location(Berlin-Tegel International Airport,52.5597000122,13.2876996994)


[36mberlinAirports[39m: [32mDataBag[39m[[32mLocation[39m[[32mString[39m]] = org.emmalanguage.api.SparkRDD@9d2a8543

In [10]:
// evaluates as Spark RDD cascasde of joins
val rs = emmaQuote {
  for {
    ap <- airports
    rt <- routes
    al <- airlines
    if rt.srcID == Some(ap.id)
    if rt.airlineID == Some(al.id)
  } yield (al.name, ap.country)
}

rs.sample(10).foreach(println)

(Cathay Pacific,China)
(All Nippon Airways,Japan)
(China Airlines,Taiwan)
(British Airways,Dominican Republic)
(China Eastern Airlines,Taiwan)
(XL Airways France,Dominican Republic)
(Ryanair,Denmark)
(Luxair,Luxembourg)
(Turkish Airlines,Turkey)
(Air Canada,Canada)


[36mrs[39m: [32mDataBag[39m[([32mString[39m, [32mString[39m)] = org.emmalanguage.api.SparkRDD@b19ffcde

In [11]:
// evaluates as Spark RDD reduceByKey
val aggs = emmaQuote {
  for {
    Group(k, g) <- routes.groupBy(_.src)
  } yield {
    val x = g.count(_.airline == "AB")
    val y = g.count(_.airline == "LH")
    k -> (x, y)
  }
}

aggs.withFilter({ case (k, (x, y)) => x > 0 && y > 0 }).sample(10).foreach(println)

(KRK,(1,2))
(MLA,(1,4))
(NUE,(21,3))
(DTM,(1,1))
(BIO,(1,2))
(SZG,(11,1))
(BUD,(1,3))
(RIX,(5,1))
(AMM,(4,1))
(KLX,(1,1))


[36maggs[39m: [32mDataBag[39m[([32mString[39m, ([32mLong[39m, [32mLong[39m))] = org.emmalanguage.api.SparkRDD@7b68b1a2

## Code Modularity

To build domain-specific libraries based on Emma, enclose your function definitions in a top-level object and annotate this object with the `@emma.lib` annotation.

In [None]:
@emma.lib
object hubs {
  def apply(M: Int) = {
    val rs = for {
      Group(k, g) <- ({
        routes.map(_.src)
      } union {
        routes.map(_.dst)
      }).groupBy(x => x)
      if g.size < M
    } yield k

    rs.collect().toSet
  }
}

@emma.lib
object reachable {
  def apply(N: Int)(hubs: Set[String]) = {
     val hubroutes = routes
       .withFilter(r => hubs(r.src) && hubs(r.dst))

     var paths = hubroutes
       .map(r => Path(r.src, r.dst))
     for (_ <- 0 until N) {
       val delta = for {
         r <- hubroutes; p <- paths if r.dst == p.src
       } yield Path(r.src, p.dst)
       paths = (paths union delta).distinct
     }

     paths
  }
}

In [None]:
// override the `hubs` and `reachable` definitions from above
// with the identical implementations in the `lib.openflights`
// module in order to fix a bug in the type handling
import org.example.emma.tutorial.lib.openflights._

In [None]:
// evaluates as Spark RDD reduceByKey
val rs = emmaQuote {
  reachable(2)(hubs(50))
}

rs.sample(10).foreach(println)