### Build Docker image, if needed

The easiest way to deploy Spark (and optionally Mesos) onto a laptop is
running Docker image ``spark_mesos``, which we build and run in the
instructions below.

These are the steps which automate the entire procedure:

```bash
$ sudo apt-get install git

$ mkdir -p $HOME/workspace && cd $HOME/workspace
$ git clone http://github.com/frgomes/debian-scripts

$ cd debian-scripts
$ ./install-docker-spark+mesos.sh
```

These steps will create a Docker image named ``spark_mesos`` and
will create a shell script which automates the startup of that.

```bash
$ docker images spark_mesos
REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE
spark_mesos         latest              acc97c716500        14 hours ago        1.09GB

$ ls -al /opt/bin/spark_mesos.sh 
-rwxr-xr-x 1 rgomes rgomes 226 Jun  4 01:50 /opt/bin/spark_mesos.sh
```

----

### Notes on Mesos - Part 1

In order to access the Mesos controller running in the Docker container, we need
to install Mesos in our laptop. However, installing Mesos in our laptop is outside
of the aims of this exercise.

You will see that we obtain a ``SparkSession`` Spark port 7077, exposed by the running
Docker container. The code looks like this:

```scala
val ss: SparkSession = 
  JupyterSparkSession
    .builder() 
    .jupyter()
    .master("local[4]").config("spark.ui.port","7077")
    .appName("fraud-detection")
    .getOrCreate()
```

### Notes on Mesos - Part 2

If you are adventurous to employ Mesos, you could probably substitute...
```scala
.master("local[4]").config("spark.ui.port","7077")
```
by...
```scala
.master("mesos://localhost:5050")
.config("java.library.path",
            "/usr/local/lib/libmesos.so")
.config("spark.executor.uri",
            "http://www.apache.org/dyn/closer.lua/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz")
```
Well... probably. This code was never tested.

### Notes on Mesos - Part 3

Chances are that, in future, we will be able to manage Mesos via a RESTful API, instead of having
to go thru a painful process of downloading the sources, building the binaries and installing
Mesos onto our laptop.

More info: http://github.com/mesosphere/mesos-http-adapter

----

### Run Docker image

Supposing you've built a Docker image as explained above, all you have to do is starting it, so that Spark and Mesos will be available easily. It's just a matter of running the command below:
```bash
$ /opt/bin/spark_mesos.sh
```

----

### Import magic for Spark 2.2.1
See available packages here: http://central.maven.org/maven2/org/apache/spark/

In [None]:
import $exclude.`org.slf4j:slf4j-log4j12`, $ivy.`org.slf4j:slf4j-nop:1.7.21`

import $profile.`hadoop-2.7`
import $ivy.`org.apache.spark::spark-core:2.2.1`
import $ivy.`org.apache.spark::spark-sql:2.2.1`
import $ivy.`org.apache.spark::spark-mllib:2.2.1`
//import $ivy.`org.apache.spark::spark-mesos:2.2.1`
import $ivy.`org.jupyter-scala::spark:0.4.2`

import jupyter.spark.session._
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

### Obtain SparkSession and SparkContext

In [None]:
val spark: SparkSession = 
  JupyterSparkSession
    .builder() 
    .jupyter()
    .master("local[4]").config("spark.ui.port","7077")
    //.master("mesos://localhost:5050").config("spark.mesos.http.adapter", true)
    .appName("fraud-detection")
//  .config("spark.executor.instances", "10")
//  .config("spark.executor.memory", "3g")
    .getOrCreate()

val sc: SparkContext = spark.sparkContext

In [None]:
val circumventReplScope = true

In [None]:
import spark.implicits._

In [None]:
case class Ad(id: Int, creation: String, price: Long)
case class Fraud(id: Int, detection: String)

if (circumventReplScope) org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

val dsAd = 
  spark.sqlContext
    .read
    .option("header", "true")
    .option("sep", ",")
    .schema(Encoders.product[Ad].schema)
    .csv("data/fraud-detection/ad.csv.gz")
    .as[Ad]
    .repartition(16)

val dsFraud =
  spark
    .read
    .option("header", "true")
    .option("sep", ",")
    .schema(Encoders.product[Fraud].schema)
    .csv("data/fraud-detection/fraud.csv.gz")
    .as[Fraud]
    .repartition(16)

In [None]:
val frauds = dsFraud.join(dsAd, "id")

In [None]:
val fraudPrices = frauds.select(frauds("price"))

In [None]:
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}

val summary: MultivariateStatisticalSummary = Statistics.colStats(fraudPrices)

In [None]:
val mean = summary.mean
//val sigma = summary.variance

val x = ads.keys.toSet intersect frauds.keys.toSet

val question_1 = for (
    id <- x;
    ad <- ads.get(id))
yield {
    (ad.id -> ad.creation)
}

val question_2a =
    for (
        id <- x;
        ad <- ads.get(id))
    yield {
        ad.price
    }

val question_2 = question_2a.sum / question_2a.size