# Big Data processing with Apache Spark

In this workshop, we will use the Apache Spark framework for performing computations on large data sets. 

## What is Spark?

From the website:

> Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) for SQL and structured data processing, [MLlib](https://spark.apache.org/docs/latest/mllib-guide.html) for machine learning, [GraphX](https://spark.apache.org/docs/latest/graphx-programming-guide.html) for graph processing, and [Spark Streaming](https://spark.apache.org/docs/latest/streaming-programming-guide.html).

Spark allows you to write reasonably concise, functional style code against large collections of data, which gets executed on a cluster of machines in a scalable way. This means that Spark abstracts the work of taking the code and distributing it over multiple machines and provides a clean API to work with data.

## How does it work?
In Spark, data is represented as a Resilient Distributed Dataset, or RDD. An RDD is very similar to a collection in normal programming; think of a [List in Scala](http://www.scala-lang.org/api/2.11.5/index.html#scala.collection.immutable.List), a [Stream in Java8](https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html) or a [list in Python](https://docs.python.org/2/tutorial/datastructures.html). You can work with lists by applying functions to them, filtering them or grouping and aggregating the list items. Just as with normal lists, Spark RDDs allow you to perform all of these operations; however, keep in mind that the execution of the operations will be distributed across a cluster. Let's look at some examples.

## Constructing RDDs
There are two ways to create an RDD:

1. Create it from a list of data that's currently in memory.
2. Read it from a file.

Creating an RDD from some in-memory list is not very common and is normally only used for debugging or showing example use cases.

Example of creating an RDD from memory:

```python
rdd = sc.parallelize(range(10000))  # Creates an RDD of the numbers 0...9999
```

Normally, it makes more sense to read an RDD from a file. Spark has built-in support for several types of files, including text files, JSON and certain types of binary files that are common in the Hadoop ecosystem. When reading data from files, those files will usually be stored on your cluster on a distributed filesystem, such as the [Hadoop Filesystem (HDFS)](http://hadoop.apache.org). This means that when loading data from a file, this data is already distributed on your cluster; Spark will make use of this fact and try to execute the code on the cluster nodes where the data is also located.

```python
# Creates an RDD from the given text file. Each line in the file becomes a string element in the RDD.
# The text file can be on the local filesystem or on some distributed filesystem.
rdd = sc.textFile('/data/huge-file.txt')
```

In [1]:
# Let's create a Spark RDD and a normal Python list.
spark_rdd = sc.parallelize(range(100))
python_list = range(100)

## Working with RDDs
An RDD is an object that abstracts a distributed dataset. A method call on the RDD object will peform the actual computation across the cluster that Spark is running on, yet it looks like normal collection operations on a local dataset.

### Types of operations
Spark RDDs have two types of operations:
- transformations: these create a new RDD by transforming the existing RDD according to some function (e.g. `map` or `flatMap`)
- actions: these perform an action and return the result to the client (usually called driver program).

The most common transformation operations are:
- `filter(function)`: filter the RDD for elements where function returns True.
- `map(function)`: transform each element of the RDD using a function.
- `flatMap(function)`: transform each element of the RDD into zero or more elements using function.
- `keyBy(...)`: transform the RDD into the same RDD with a key for each element (keys need not be unique).
- `groupByKey()`: transform the RDD into an RDD of keys and groups of values with the same key.
- `reduceByKey(function)`: group the RDD by key and perform a reduce with a binary function on each group.
- `sample(...)`: create a new RDD by sampling a subset of the original RDD; useful for quickly getting an overview of datasets, without having to process all of it.

The most common actions are:
- `count()`: count the number of elements in the RDD and return the result to the client.
- `reduce(function)`: use a binary function to reduce all elements in the RDD to a single value, which is returned to the client.
- `take(n)`: take the first n elements from the RDD and return them to the client.
- `collect()`: collect the entire contents of the RDD and return them to the client (use with caution).

#### Filter the list for numbers <= 10

In [2]:
# Python
filter(lambda n: n <= 10, python_list)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [3]:
# Spark
spark_rdd.filter(lambda n: n <= 10).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

#### Count the number of elements in the list

In [4]:
# Python
len(python_list)

100

In [5]:
# Spark
spark_rdd.count()

100

#### Count the odd numbers in the list (count the number of elements in the list filtered for odd numbers)

In [6]:
# Python
len(filter(lambda n: n % 2 == 0, python_list))

50

In [7]:
# Spark
spark_rdd.filter(lambda n: n % 2 == 0).count()

50

#### Take the square of each number after filtering

In [8]:
# Python
map(lambda n: n ** 2, filter(lambda n: n <= 10, python_list))

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

In [9]:
# Spark
spark_rdd\
.filter(lambda n: n <= 10)\
.map(lambda n: n ** 2)\
.collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

#### Split the number in two groups:
- True: n < 50
- False: n >= 50

Then calculate the sum for each group.

In [10]:
# Python
# Note the lowercase syntax
from itertools import groupby

map(lambda (key, values): (key, sum(values)),
    groupby(python_list, lambda n: n < 50))

[(True, 1225), (False, 3725)]

In [11]:
# Spark
# Note that instead of a groupBy and then sum, we just use keyBy and then reduceByKey,
# which makes sure that the summing is a distributed operation,
# thus the list for one group does not have to fit in memory.
# Note the camelCase syntax
spark_rdd\
.keyBy(lambda n: n < 50)\
.reduceByKey(lambda x,y: x + y)\
.collect()

[(False, 3725), (True, 1225)]

#### Get the first 10 elements of the list

In [12]:
# Python
python_list[:10]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [13]:
# Spark
spark_rdd.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

#### Calculate the sum of the dataset

In [14]:
# Python
sum(python_list) # Note that this is not parallelizable

4950

In [15]:
# Spark
# Using reduce() to make the distributed computation explicit ...
spark_rdd.reduce(lambda x,y: x + y)
# ... but spark_rdd.sum() will do as well

4950

## Lazy evaluation
Spark RDDs are lazily evaluated. That means that no actual computation is performed until you request a result - Spark attempts to perform as little computation as possible for the required output. Thus, all of the transformation operations will only be executed when the result is requested by performing an action.

Consider the example below. In both cases, we apply the same transformation to an RDD, but only in the second case we actually request the result. Notice that:
1. Applying the transformation returns instantly.
2. Applying a transformation and then taking 5 elements takes more time. Applying a time_consuming_function to all elements in the RDD should take 50 seconds (100 * 0.5), however the map is applied only to the first 5 elements as that is the number of elements we request: Spark refrains from performing unnecessary computations in order to produce the requested result.

In [16]:
from time import sleep

def time_consuming_function(n):
    sleep(0.5)      # sleep 0.5 seconds
    return n ** 2   # return square of n

In [17]:
%%timeit -n1 -r1

# Transform the RDD using map, but take no action.
result = spark_rdd.map(time_consuming_function)

1 loops, best of 1: 13.8 µs per loop


In [18]:
%%timeit -n1 -r1

# Transform the RDD and take 5 elements. Print is required because the %%timeit otherwise clogs the output.
result = spark_rdd.map(time_consuming_function)
print result.take(5)

[0, 1, 4, 9, 16]
1 loops, best of 1: 2.54 s per loop


## Checkpointing and caching
Spark RDDs are used to construct a DAG (directed acyclic graph) of operations. It is possible that machine failures occur while such a job is running. In this case, Spark will compute the results by re-running the parts of the DAG that were affected. While this effectively takes care of machine failures, it can lead to very expensive recomputation of intermediate results. Because of this, it sometimes makes sense to checkpoint an RDD, using the RDD's `checkpoint()` method.

It is possible for an RDD to be the source for multiple operations, possibly many. In this case, reading the same RDD from disk for each iteration or operation can cause a considerable amount of overhead. In some cases the RDD might actually fit in the aggregated memory of the cluster that you are working on. In this case, it makes sense to cache the RDD in memory. Spark allows to do this, using the `cache()` method on an RDD.