# Introducing Apache Spark

Apache Spark is a tool for doing distributed computing. It has implementations in Scala, Java and Python (yay!). 

# Loading `pyspark`

The first step is to import `pyspark` into Python, as well as two very important functions that get Spark working. We'll only be using Spark locally in this notebook, but later on we'll see an example of how to use Spark to do "real" distributed computing! 

In [None]:
import pyspark
from pyspark import SparkConf, SparkContext

We will need `SparkConf` and `SparkContext` to set the configuration of the Spark instance and the "context". The Spark context is the "main entry point for Spark functionality" [1](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html). This is because `SparkContext` tells Spark how to access a cluster, which is where the data will be stored. To define a context, define a configuration for the Spark instance and then initialize the context with `SparkContext`. The generic syntax looks something like this:

```python
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
```
The `appName` parameter is a name for the application that will show up on the cluster UI. `master` can be a Spark, Mesos or YARN cluster URL; we can instantiate the context without hardcoding `master`, and then launch jobs to a cluster using `spark-submit`. 

For the time being, we will hardcode `master` with a special "local" string that will let us run Spark *in-process*.

In [None]:
appName = 'firstApp'
master = 'local[*]'
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

# Resilient distributed datasets

A [Resilient Distributed Dataset](http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds) (RDD) is the kind of data structure used by Spark for **distributed computing**: it lets content from the same data object reside on multiple nodes of a cluster. From the link above, an RDD is "a fault-tolerant collection of elements that can be operated on in parallel". There are two ways of creating an RDD [2](http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds):

1. *parallelizing* an existing collection in your driver program
2. *referencing* a dataset in an *external storage system*, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat

In [None]:
rdd = sc.parallelize(range(1000))

In [None]:
rdd.takeSample(False, 5, seed=1)

# RDD Operations

[RDD Operations](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations)

## Basic operations

To demonstrate some basic operations on RDDs, we'll first need an RDD to work with. We *could* use the RDD, `rdd`, that we created above, but instead will demonstrate how to read in data from a text file. 

In [None]:
lines = sc.textFile("./data/data.txt")

**Important note:** We have not actually loaded any data into Python or Spark here! Instead, we have created a `pyspark` object that has a reference to a data file that we want to analyze. We will only have to load the data when we finally tell `pyspark` to return a value for us. 

For example, suppose we want to figure out the length of each line in the data file we imported. To do this, we will have to define a function that computes the length of each line and returns all of them as a vector. The tool we use for this is called `map`. `map` accepts a function as an argument, and performs that function on each element of the RDD. 

In [None]:
lineLengths = lines.map(len)

<div class="alert alert-block alert-info">**Important note:** `pyspark` *still* hasn't done anything &mdash; how *lazy* it must be. If we really want to see what the length of each element is, then we have to ask `pyspark` more explicitly. There are several options to do this, depending on what you're looking for. See documentation for each of these functions by putting your cursor in between their parentheses and pressing `Shift` + `Tab`</div>

In [None]:
lineLengths.take(2)

In [None]:
lineLengths.takeOrdered(2)

In [None]:
lineLengths.takeSample(False, 1, 1)

In [None]:
lineLengths.collect()

In [None]:
totalLength = lineLengths.reduce(lambda a, b: a + b)

In [None]:
totalLength

# trivial `pyspark` example

In [None]:
import random
from pyspark.sql import SQLContext
from pyspark import Row
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np

In [None]:
sqlContext = SQLContext(sc)

First, we will create a spark dataframe with 3 numeric columns and one categorical (colour).

In [None]:
random.seed(a=1)
A = [random.normalvariate(0,1) for i in range(100)]
B = [random.normalvariate(1,2) for i in range(100)]
C = [random.normalvariate(-1,0.5) for i in range(100)]
col = [random.choice(['#e41a1c', '#377eb8','#4eae4b']) for i in range(100)]

In [None]:
df = SQLContext(sc).createDataFrame(sc.parallelize(zip(A,B,C,col)), ["A","B","C","col"])

In [None]:
df.show(5)

In [None]:
# convert to pandas and plot
pdf = df.toPandas()

In [None]:
from pandas.tools.plotting import scatter_matrix
stuff = scatter_matrix(pdf, alpha=0.7, figsize=(6, 6), diagonal='kde', c=pdf.col)

In [None]:
array = [Row(key="a", group="vowels", value=1),
         Row(key="b", group="consonants", value=2),
         Row(key="c", group="consonants", value=3),
         Row(key="d", group="consonants", value=4),
         Row(key="e", group="vowels", value=5)]
dataframe = sqlContext.createDataFrame(sc.parallelize(array))

In [None]:
dataframe.columns

In [None]:
dataframe.collect()

In [None]:
dataframe.show()

# Using `sql` to display stuff

In [None]:
from pyspark.sql import Row

In [None]:
array = map(lambda x: Row(key="k_%04d" % x, value = x), range(1, 5001))
largeDataFrame = sqlContext.createDataFrame(sc.parallelize(array))
largeDataFrame.registerTempTable("largeTable")
sqlContext.sql("select * from largeTable").show(10)

In [None]:
largePivotSeries = map(lambda x: Row(key="k_%03d" % (x % 200), series_grouping = "group_%d" % (x % 3), value = x), range(1, 5001))
largePivotDataFrame = sqlContext.createDataFrame(sc.parallelize(largePivotSeries))
largePivotDataFrame.registerTempTable("table_to_be_pivoted")
sqlContext.sql("select * from table_to_be_pivoted").show(5)

# `pyspark` example

This is a $k$-means example using `pyspark`, which is taken from the `pyspark` documentation.

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('KMeansExample').getOrCreate()

In [None]:
dataset = spark.read.format('libsvm').load('/usr/local/spark/data/mllib/sample_kmeans_data.txt')

In [None]:
dataset.show(5)

Now train a $k$-means model

In [None]:
kmeans = KMeans().setK(2).setSeed(1)

In [None]:
model = kmeans.fit(dataset)

Evaluate clustering by computing Within Set Sum of Squared Errors.

In [None]:
wssse = model.computeCost(dataset)
print("Within Set Sum of Squared Errors = " + str(wssse))

In [None]:
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)