# Introducing Apache Spark

Apache Spark is a tool for doing distributed computing. It has implementations in Scala, Java and Python (yay!). 

# Loading `pyspark`

The first step is to import `pyspark` into Python, as well as two very important functions that get Spark working. We'll only be using Spark locally in this notebook, but later on we'll see an example of how to use Spark to do "real" distributed computing! 

In [None]:
import pyspark
from pyspark import SparkConf, SparkContext

We will need `SparkConf` and `SparkContext` to set the configuration of the Spark instance and the "context". The Spark context is the "main entry point for Spark functionality" [1](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html). This is because `SparkContext` tells Spark how to access a cluster, which is where the data will be stored. To define a context, define a configuration for the Spark instance and then initialize the context with `SparkContext`. The generic syntax looks something like this:

```python
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
```
The `appName` parameter is a name for the application that will show up on the cluster UI. `master` can be a Spark, Mesos or YARN cluster URL; we can instantiate the context without hardcoding `master`, and then launch jobs to a cluster using `spark-submit`. 

For the time being, we will hardcode `master` with a special "local" string that will let us run Spark *in-process*.

In [None]:
appName = 'firstApp'
master = 'local[*]'
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

# Resilient distributed datasets

A [Resilient Distributed Dataset](http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds) (RDD) is the kind of data structure used by Spark for **distributed computing**: it lets content from the same data object reside on multiple nodes of a cluster. From the link above, an RDD is "a fault-tolerant collection of elements that can be operated on in parallel". There are two ways of creating an RDD [2](http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds):

1. *parallelizing* an existing collection in your driver program
2. *referencing* a dataset in an *external storage system*, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat

In [None]:
rdd = sc.parallelize(range(1000))

In [None]:
rdd.takeSample(False, 5, seed=1)

# RDD Operations

[RDD Operations](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations)

## Basic operations

To demonstrate some basic operations on RDDs, we'll first need an RDD to work with. We *could* use the RDD, `rdd`, that we created above, but instead will demonstrate how to read in data from a text file. 

In [None]:
lines = sc.textFile("./data/data.txt")

<div class="alert alert-block alert-info">We have not actually loaded any data into Python or Spark here! Instead, we have created a `pyspark` object that has a reference to a data file that we want to analyze. We will only have to load the data when we finally tell `pyspark` to return a value for us. </div>

### map-reduce
For example, suppose we want to figure out the length of each line in the data file we imported. To do this, we will have to define a function that computes the length of each line and returns all of them as a vector. The tool we use for this is called `map`. `map` accepts a function as an argument, and performs that function on each element of the RDD. 

In [None]:
lineLengths = lines.map(len)

<div class="alert alert-block alert-info">`pyspark` *still* hasn't done anything &mdash; how *lazy* it must be. If we really want to see what the length of each element is, then we have to ask `pyspark` more explicitly. There are several options to do this, depending on what you're looking for. See documentation for each of these functions by putting your cursor in between their parentheses and pressing `Shift` + `Tab`</div>

We can `take` the first two elements of the vector `lineLengths` like so:

In [None]:
lineLengths.take(2)

We can take the first two elements of `lineLengths` after sorting like so:

In [None]:
lineLengths.takeOrdered(2)

We can even *sample* two elements from the vector:

In [None]:
lineLengths.takeSample(False, 1, 1)

Lastly, we can `collect` all the values, which returns a list containing all of the elements in our RDD.

In [None]:
lineLengths.collect()

<div class="alert alert-block alert-warning">
When we're working distributedly (*i.e.,* with data stored on nodes on a cluster), using `collect` can cause our driver (*i.e.,* our Jupyter instance) to run out of memory. This is because `collect` fetches the **entire** RDD to a single machine. If you only need to print a few elements of the RDD, a safer approach is to stick with `take` (*e.g.,* `rdd.take(100)`).
</div>

Next, suppose we wanted to find the total length of the rdd. To do this, we will use a `reduce` function on our `map` object. `reduce` takes a function and vector and iteratively applies the function to the current result and next element of the vector:
$$
\mathrm{reduce}(+, [1,2,3,4,5])
= \mathrm{reduce}(+, [3,3,4,5])
= \mathrm{reduce}(+, [6,4,5])
= \mathrm{reduce}(+, [10,5])
= 15
$$

In [None]:
totalLength = lineLengths.reduce(lambda a, b: a + b)

In [None]:
totalLength

<div class="alert alert-block alert-info">This time, spark *does* return a value &mdash; why!? It's because `reduce` is an *action* &mdash; at this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.</div>

### Functions that work with key-value pairs

In [None]:
repeated_lines = sc.textFile('./data/repeated-data.txt')

In [None]:
pairs = repeated_lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

For a list of other transformations for RDDs, visit [this section of the Spark documentation](http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations).

# Example: working with dataframes in `pyspark.sql`

Typically, real-life data is stored in objects known as dataframes &mdash; a bunch of columns each containing observations of some kind (*e.g.,* `int`, `str`, `list`, *etc.*). To work with distributed dataframes using `pyspark`, we will use the `SparkSession` function from `pyspark.sql`. 

<div class='alert alert-block alert-warning'>Note that `SparkSession` is the function of choice for working with structured data (rows and columns) in Spark 2.x.x (specifically, used for working with the Dataset and Dataframe API). This function replaces the deprecated-ish `SQLContext` used in Spark 1.x.x. To be sure, double-check that Spark 2.x.x is running &mdash; verifying that the right version is running is always **very** important to do, but especially so when you have multiple people running the same code on multiple (possibly different) systems and set-ups.</div>

In [None]:
pyspark.version.__version__

## Making a dataframe 1

### Import packages

We'll wind up plotting the data in a dataframe, and maybe doing some numerical computation on it, so let's load in the standard python setup.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

Now, for this example we will need the following specific functions from `pyspark` (and `random`).

In [None]:
from pyspark.sql import SparkSession
from pyspark import Row
import random

### Create a `SparkSession`

Initialize the Spark Session instance using the context we defined earlier. 

In [None]:
ss = SparkSession(sc)

### Create a dataframe!

As before, there are different ways of creating distributed objects. Here, we'll just define some data in memory, and then create a spark dataframe from this data. Note that the following approach is very parallelizable, because we can define each column separately. We'll be using 3 numeric columns and one categorical (colour).

In [None]:
random.seed(a=1)
A = [random.normalvariate(0,1) for i in range(100)]
B = [random.normalvariate(1,2) for i in range(100)]
C = [random.normalvariate(-1,0.5) for i in range(100)]
col = [random.choice(['#e41a1c', '#377eb8','#4eae4b']) for i in range(100)]

To create a dataframe, we have to pass the dataframe and rdd object. Since we have the data in memory, we can do this by calling the `parallelize` method from our Spark context. Note that the function would also accept a `pandas` data frame as an argument. 

In [None]:
df = ss.createDataFrame(sc.parallelize(zip(A,B,C,col)), ['A', 'B', 'C', 'col'])

In [None]:
df.show(5)

### Visualizing the dataframe

In [None]:
# convert to pandas and plot
pdf = df.toPandas()

In [None]:
from pandas.tools.plotting import scatter_matrix
stuff = scatter_matrix(pdf, alpha=0.7, figsize=(6, 6), diagonal='kde', c=pdf.col)

## Make a dataframe 2

The way we made `df` above is not the only way to make a dataframe in Spark.

In [None]:
array = [Row(key="a", group="vowels", value=1),
         Row(key="b", group="consonants", value=2),
         Row(key="c", group="consonants", value=3),
         Row(key="d", group="consonants", value=4),
         Row(key="e", group="vowels", value=5)]
dataframe = ss.createDataFrame(sc.parallelize(array))

We can reference the columns of the dataframe object as follows.

In [None]:
dataframe.columns

Notice the difference between `collect` and `show`.

In [None]:
dataframe.collect()

In [None]:
dataframe.show()

## Using SQL syntax to display a subset of a dataframe

We can also use SQL syntax to query data from the dataframe we create (note: this requires knowing SQL syntax).

In [None]:
array = map(lambda x: Row(key="k_%04d" % x, value = x), range(1, 5001))
largeDataFrame = ss.createDataFrame(sc.parallelize(array))
largeDataFrame.registerTempTable("largeTable")
ss.sql("select * from largeTable").show(10)

In [None]:
largePivotSeries = map(lambda x: Row(key="k_%03d" % (x % 200), series_grouping = "group_%d" % (x % 3), value = x), range(1, 5001))
largePivotDataFrame = ss.createDataFrame(sc.parallelize(largePivotSeries))
largePivotDataFrame.registerTempTable("table_to_be_pivoted")
ss.sql("select * from table_to_be_pivoted").show(5)

### Pivoting the largePivotDataFrame 1

And then if we want to pivot the table using standard `pyspark`/`pandas` syntax:

In [None]:
largePivotDataFrame.groupBy('series_grouping').avg('value').show()

### Pivoting the largePivotDataFrame 2

In [None]:
ss.sql("select key, series_grouping, sum(value) from table_to_be_pivoted group by key, series_grouping order by key, series_grouping").show()

# Example: Machine learning with `pyspark`

This is a $k$-means example using `pyspark`, which is taken from the `pyspark` documentation.

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('KMeansExample').getOrCreate()

In [None]:
dataset = spark.read.format('libsvm').load('/usr/local/spark/data/mllib/sample_kmeans_data.txt')

In [None]:
dataset.show(5)

Now train a $k$-means model

In [None]:
kmeans = KMeans().setK(2).setSeed(1)

In [None]:
model = kmeans.fit(dataset)

Evaluate clustering by computing Within Set Sum of Squared Errors.

In [None]:
wssse = model.computeCost(dataset)
print("Within Set Sum of Squared Errors = " + str(wssse))

In [None]:
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)