# Introduction to Spark

In this class, we'll look at practical use of the Spark framework using PySpark. We'll see how to distribute data, how to use the MapReduce model, and how to store and manipulate distributed data.

Spark is most often used in large datacenters, GCP provides [DataProc](https://cloud.google.com/dataproc) platform which can be used for Spark. For today, however, we'll use local installations in Binder or Colab with small examples, just to see how PySpark works. To follow, you should either use the Binder link provided, install Spark locally on a Linux machine, or upload this notebook to Colab and follow the instructions.

## Outline

+ [Use in Binder](#binder)
+ [Local installation on Linux](#linux)
+ [Use in Colab](#colab)
+ [Running Spark](#running)
+ [Warm up with RDDs](#RDDs)
+ [Persistent RDDs](#persistent)
+ [Distributed K-Means](#kmeans)
+ [DataFrames and Queries](#DataFrames)

## <a id="binder">Use in MyBinder</a>

Instead of locally installing, you can also use MyBinder or any binderHub to run this notebook: 
https://mybinder.org/

https://mybinder.org/v2/gh/CNES/big-data-processing-course/main?urlpath=lab

In [None]:
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.master("local[2]").getOrCreate()
#sc = spark.sparkContext

## <a id="linux"> Local installation on Linux

Download Spark, prebuilt for Hadoop 3 from the [Spark downloads page](http://spark.apache.org/downloads.html). Install the package to `/opt/`:

```bash
$ tar xvzf spark-3.5.0-bin-hadoop3.tgz
$ mv spark-3.5.0-bin-hadoop3 /opt/spark-3.5.0
$ ln -s /opt/spark-3.5.0 /opt/spark
```

Afterwards, you'll want to configure your `$PATH` variable, normally in your `~/.bashrc`:

```bash
export SPARK_HOME=/opt/spark
export PATH=$SPARK_HOME/bin:$PATH
```

You can now launch PySpark to run this notebook. If you don't have PySpark in your Python environment, install it. You'll want to set some environment variables (as your normal user) for PySpark so that it automatically runs Jupyter.

```bash
$ pip install pyspark pyarrow
$ export PYSPARK_DRIVER_PYTHON="jupyter"
$ export PYSPARK_DRIVER_PYTHON_OPTS="notebook --pylab inline"
$ pyspark
```

## <a id="colab">Use in Colab</a>

Instead of locally installing, you can also upload this notebook to Colab. This may be a bit slower than local installations. Uncomment (`ctrl /`) and run the following:

In [None]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
# !tar xf spark-3.3.1-bin-hadoop3.tgz
# !pip install -q findspark
# !pip install py4j pyarrow

`findspark` will help find the Spark installation, and `py4j` lets us access Java objects. We'll see environment varibles like in the local installation :

In [None]:
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

Finally we'll create the Spark instance and context

In [None]:
#import findspark
#findspark.init("spark-3.3.1-bin-hadoop3")# SPARK_HOMEfrom pyspark.sql import SparkSession
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.master("local[*]").getOrCreate()
#sc = spark.sparkContext

## <a id="HAL">Use on CNES Cluster HAL</a>

You can also download this notebook an run it on HAL using an already existing environment.

First, create a kernel file in $HOME/.local/share/jupyter/kernels/spark/kernel.json

```
{
 "argv": [
  "/work/scratch/eynardbg/shared/conda_envs/spark/bin/python",
  "-m",
  "ipykernel_launcher",
  "-f",
  "{connection_file}"
 ],
 "display_name": "spark",
 "language": "python",
 "metadata": {
  "debugger": true
 }
}
```

Then just

- Download this notebook: go to the file on github, click on raw, save as.
- Open Jupyterhub: https://jupyterhub.sis.cnes.fr
- Just upload the file (button on upper left corner)
- Open it, and switch to spark kernel
- Then uncomment and execute the code below.

In [None]:
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.master("local[4]").getOrCreate()
#sc = spark.sparkContext

## <a id="running">Running Spark</id>

If you can run the following line, you've properly installed and started Spark. This will show us the SparkContext `sc`:

In [None]:
sc

You should see the Spark version (`v3.5.0` is the last at the time of this writing) and a link to the Spark UI, a dashboard which lets us monitor Spark's activity.

Spark UI won't be available on Colab.

On Binder or HAL, you can go to https://<jupyterurl>/user/<username>/proxy/4040/jobs/ to see it!

## <a id="RDDs">Warm up with RDDs</a>

We'll start with an example of manipulating Spark's [Resilient Distributed Datasets](https://spark.apache.org/docs/latest/rdd-programming-guide.html). We'll parallelize Python objects, but we could also use these tools to manipulate data stored in shared filesystems such as HDFS or HBase. Specifically, we'll compute pi using a [Monte Carlo Simulation](https://en.wikipedia.org/wiki/Monte_Carlo_method).

![alt text](http://www.physics.smu.edu/fattarus/pi.png "Pi simulation")

In [None]:
n_points = 1000

points = sc.parallelize(range(n_points))
points

As we can see, our python object `range(n_points)` has been converted to a Scala object through the PySpark context `sc`.

In [None]:
from random import random

def generate_random_pt(_):
    x = random() * 2 - 1 # -> Rnd number between -1 and 1
    y = random() * 2 - 1
    return x, y

def is_inside_unary_circle(t):
    (x, y) = t
    return 1 if x ** 2 + y ** 2 <= 1 else 0

points = points.map(generate_random_pt)

inside_points = points.filter(is_inside_unary_circle)

Here we `map` the function `generate_random_pt` to our `points` RDD, applying it to each element. We then filter these points, `(x, y)` coordinates between -1 and 1, based on if they fit in a unary circle.

In [None]:
print("Example point : {}".format(inside_points.first()))

By counting the number of points inside the circle, divided by the total number of points, we can get an estimation of the ratio between circle area and square area. Multiplying the square area surface with this ratio should give us pi:

Surface inside circle = pi * r^2 = pi

Surface inside circle = (number of points inside / total number of points) * square surface


In [None]:
inside_circle_points = inside_points.count()
overall_area = (1 - -1) * (1 - -1)
print("Pi estimation is {}".format(overall_area * inside_circle_points
                                   / float(n_points)))

By increasing the number of points, we get a better estimation. With Spark parallelization, this goes faster than if we were doing it serially. You can watch your Spark UI dashboard when launching this to see the computation live.

In [None]:
n_points = 10000
points = sc.parallelize(range(n_points))
points = points.map(generate_random_pt)
inside_points = points.filter(is_inside_unary_circle)

In [None]:
inside_circle_points = inside_points.count()
print( "Pi estimation is {}".format(overall_area*inside_circle_points/float(n_points)) )

<div class="alert alert-info">
    Reflection:
    <ul>
        <li>Monte Carlo analysis like this is considered <i>embarrasingly parallel</i>. What about the function we defined makes it easy to run in parallel?</li>
        <li>Re-run the previous cell, which is just the <code>count</code> function. Why does this change the result? Actually in Spark lastest versions, this does not!!</li>
    </ul>
</div>

## <a id="persistent">Persistent RDDs</a>

In Spark, there are two classes of operations: *transformations* like `map` which create a new dataset and *actions* like `count` which return a value. 

In [None]:
n_trials=100000
n_throws=100

pil_data = sc.parallelize(range(n_trials))

def generate_play():
    return "heads" if random()>0.5 else "tails"

def generate_game(_):
    return [generate_play() for _ in range(n_throws)]

pil_data = pil_data.map(generate_game)

game = pil_data.first()
print(game)

In [None]:
def get_first(game):
    return game[0]

In [None]:
%%time
print("Heads ", pil_data.map(get_first).filter(lambda res: res=="heads").count())

Here we use a MapReduce model to get the first flip of each game and then filter the games based on if the first flip is a heads. We can also count the number of tails flips: 

In [None]:
%%time
print("Tails ", pil_data.map(get_first).filter(lambda res: res=="tails").count())

These two reduce operations, counting the heads and tails, use the same map operation. To save the results from this mapping, we use `persist`. This will also speed up the computation, since the `map` function `get_first` is only applied once.

In [None]:
%%time
cached_rdd = pil_data.map(get_first).persist()
print("Heads ", cached_rdd.filter(lambda res: res=="heads").count())
print("Tails ", cached_rdd.filter(lambda res: res=="tails").count())

<div class="alert alert-warning">
    Exercise:
    <br>
    Write functions which count if the majority of flips in a single game were heads or tails by defining <code>f_count</code>, <code>heads_filter</code>, and <code>tails_filter</code>. Compare the speed of running these with a persistent RDD. You might want to lower <code>n_trials</code> to speed up testing.
</div>

In [None]:
import numpy as np

def f_count(games):
    ## Put your code here

In [None]:
f_count(['heads', 'heads', 'tails', 'heads', 'heads', 'tails', 'tails', 'tails', 'tails', 'heads', 'heads', 'heads']) == [7, 5]

In [None]:
def heads_filter(counts):
    ## Your code here

In [None]:
heads_filter([53,47]) == True

In [None]:
def tails_filter(counts):
    ## Your code here

In [None]:
tails_filter([53,47]) == False

In [None]:
%%time
pil_data.map(f_count).filter(heads_filter).count()

In [None]:
%%time
pil_data.map(f_count).filter(tails_filter).count()

In [None]:
%%time
cached_rdd = pil_data.map(f_count)
print('Heads: ', cached_rdd.filter(heads_filter).count())
cached_rdd.persist()
print('Tails: ', cached_rdd.filter(tails_filter).count())

<div class="alert alert-info">
    Reflection:
    <ul>
        <li>Is <code>filter</code> a transformation or an action?</li>
        <li>Consider the following:<br>
<code>cached_rdd = pil_data.map(f_count)
print('Heads: ', cached_rdd.filter(heads_filter).count())
cached_rdd.persist()
print('Tails: ', cached_rdd.filter(tails_filter).count())</code><br>
            Is this a good use of persist()?
    </ul>
</div>

## <a id="kmeans">Distributed K-Means</a>

With the MapReduce functionality we can see how some algorithms can be distributed, for example K-Means. To show this, we'll start with some randomly generated points:

In [None]:
from random import choice
import numpy as np

means = [0, 1, 5, -2]

def generate_random_pt(_):    
    mean = choice(means)    
    return np.random.randn() + mean

n_points = 10000
points = sc.parallelize(range(n_points))
points = points.map(generate_random_pt)
points.sample(False,0.001).collect()

Our map function will calculate the nearest cluster for each point, based on the current centroids

In [None]:
def closest_to(point, centroids):
    distances = [(point - c)**2 for c in centroids]
    return np.argmin(distances)

In [None]:
N = 4 # Number of centroids
centroids = []
for i in range(N):
    centroids.append(20 * np.random.rand() - 10)
centroids

In [None]:
closest_to(points.first(), centroids)

Our K-means algorithm will look like this:
* Compute centroids of each cluster
* Update centroids
* Repeat

First transformation:
 - Compute centroid for each datapoint
 - datapoint -> Tuple ( closest centroid Index , ( datapoint, 1 ) )

In [None]:
rdd = points.map(lambda p: (closest_to(p, centroids), (p, 1)))
rdd.first()

Reduce By Key: Aggregate for each centroid
 - centroid Index , ( sum of datapoints , number of datapoints )
 - ( datapoint1, pop1 ) and ( datapoint2, pop2 ) => (datapoint1 + datapoint2 , pop1+pop2)

In [None]:
stats = rdd.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))
stats.collect()

So that at the end, 
 - (cluster Index, (sum of datapoint in cluster , number of datapoints in cluster) )
 - we can compute the centroid

In [None]:
for stat in stats.collect():
    index, (data_sum, data_count) = stat
    new_centroid = data_sum / data_count
    centroids[index] = new_centroid

In [None]:
centroids

We'll repeat this a few times to see if it converges:

In [None]:
for i in range(10):
    rdd = points.map(lambda p: (closest_to(p, centroids), (p, 1)))
    rdd.first()
    stats = rdd.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))
    stats.count()
    for stat in stats.take(N):
        index, (data_sum, data_count) = stat
        new_centroid = data_sum / data_count
        centroids[index] = data_sum / data_count
    print(centroids)

This seems like it works! However, if we really use Spark to do K-means, we should use [the Spark ML class](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/clustering.html#KMeans)

## <a id="DataFrames">DataFrames and Queries</a>

Spark allows for structured data manipulation using SQL and DataFrames. Spark DataFrames are similar to Pandas DataFrames, and we'll look at conversion next. DataFrames can be loaded from databases, distributed filesystems, or memory. We can use `pyarrow` to convert python memory objects to Spark objects, notably Pandas DataFrames to Spark DataFrames. First we must configure Spark to use Arrow:

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enable", "true")

In [None]:
import pandas as pd
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))
pdf.head()

`pdf` is just a normal Pandas DataFrame, and we would not be able to do parallel manipulation on it. We'll convert it to a Spark DataFrame to allow for Spark SQL queries.

In [None]:
# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)
df

We can also convert Spark DataFrames back to Pandas DataFrames, for example after heavy calculation is finished.

In [None]:
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()
result_pdf.head()

We can also convert RDDs, which we used in the last section, directly to DataFrames. We'll download some example data, the famous [wine](https://archive.ics.uci.edu/ml/datasets/Wine) dataset, and convert it to an RDD, then a DataFrame. This dataset has various chemical measurements of three different types of wine.

In [None]:
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/wine/wine.data

We read this into our Spark context and can use a `map` function to process it. These are RDDs:

In [None]:
lines = sc.textFile("wine.data")
raw = lines.map(lambda l: l.split(","))
raw.first()

We start by convering our RDD to a list of Row objects. We'll still have an RDD, but of Rows inside.

In [None]:
from pyspark.sql import Row
wines = raw.map(
    lambda p: Row(alcohol=int(p[0]),
                  malic_acid=float(p[1]),
                  ash=float(p[2]),
                  alcalinity=float(p[3]),
                  magnesium=float(p[4]),
                  phenols=float(p[5]),
                  flavanoids=float(p[6]),
                  nonflavanoids=float(p[7]),
                  proanthocyanins=float(p[8]),
                  color=float(p[9]),
                  hue=float(p[10]),
                  od=float(p[11]),
                  proline=float(p[12])))
wines.take(3)

This can now be converted to a DataFrame:

In [None]:
df = spark.createDataFrame(wines)
df

A Spark DataFrame can act like a standard database, allowing for operations such as grouping. We can see the distribution of classes in this dataset by grouping on the `alcohol` label:

In [None]:
df.groupBy("alcohol").count().show()

But the DataFrame also allows for transformation and action operations, like RDDs. Since we have multiple columns, we can specify which columns to apply the operations to. For example, we can filter based on certain features:

In [None]:
df.filter((df["ash"] <= 3.0) & (df["alcohol"] >= 1)).count()

We can also directly use SQL to query a Spark DataFrame. We first need to register the DataFrame in the Spark context.

In [None]:
df.createOrReplaceTempView("wines")
spark.sql("SELECT alcohol, ash FROM wines WHERE ash <= 3.0 AND alcohol >= 1").count()

Let's consider feature scaling the `ash` column. We'll do this by hand, but Spark ML provides the [StandardScaler](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.StandardScaler) to do it more easily. First, we aggregate the min and max values:

In [None]:
max_ash = df.agg({"ash": "max"}).collect()[0]["max(ash)"]
min_ash = df.agg({"ash": "min"}).collect()[0]["min(ash)"]
max_ash, min_ash

Next, we'll define a new function and register it. The `udf` module allows us to define any function to apply to our data while still using the Spark SQL framework.

In [None]:
from pyspark.sql.functions import udf
@udf("double")
def normalize(s):
  return (s - min_ash) / (max_ash - min_ash)

In [None]:
normed = df.select("ash", normalize("ash").alias("ash_normed"))
normed.take(3)

We now have a new DataFrame with the normalized ash values. Here we've been using a small dataset for an example, but the true benefits of Spark show when we're using large datasets distributed over many nodes. Everything we've just shown would be possible with distributed data, and Spark will automatically parallelize certain processing tasks.

<div class="alert alert-info">
Spark is a general computing framework which excels at performance and is useful for a variety of tasks. While we briefly mentioned its `ml` package, it was not specifically developped for ML. Next, we'll look at Dask, which is another full Python solution for scaling data processing and ML tasks.
</div>

## Solutions

In [None]:
import numpy as np

def f_count(games):
    return np.unique(games, return_counts=True)[1].tolist() # TODO: remove

def heads_filter(counts):
    return counts[0] > n_throws / 2 # TODO: remove

def tails_filter(counts):
    return counts[1] > n_throws / 2 # TODO: remove