## 2.2 Spark Core

In this section, we'll explore the core Spark API.  Later today, we'll look at Spark SQL and MLLib, which present a higher-level abstraction to this core.

As always, we'll scratch the surface here.  To learn more, I recommend two books:
* [Learning Spark](http://shop.oreilly.com/product/0636920028512.do) introduces most concepts in a tutorial style.
* [Advanced Analytics with Spark](http://shop.oreilly.com/product/0636920035091.do) works through about a dozen real-world and complete examples of machine learning and data analysis using Spark.

Conceptually, a Spark program is divided into a **driver**, typically running on your laptop or on a gateway machine in the cluster, and a large set of **executors**, running on the worker nodes of the cluster.  Spark exposes the connection to the executors via an object called a **`SparkContext`**, or `sc` for short.

The driver breaks up any calculation into a series of **tasks** that are sent to the executors.  To accomplish their tasks, executors may send data to each other.  When the tasks complete, any information requested by the driver is sent back by the executors.

<img src="images/SparkComponents.png" width="50%">  
_Image source: [Learning Spark](http://shop.oreilly.com/product/0636920028512.do)_

Spark organizes calculations as a sequence of **transformations** and **actions** on a **Resilient Distributed Dataset** or RDD.  Let's look at each word in turn.

An RDD, like a NumPy `array` or a Pandas `Series`, is conceptually an ordered collection of items.  The items, however, aren't held in the driver machine's memory.  Rather, the item collection is split into **partitions**, and each partition is stored in the local memory of an executor somewhere in the cluster.

Here are various ways to initially create an RDD:

You can turn a local Python list into an RDD using `sc.parallelize`.  The items in the list will be automatically partitioned and each partition will be sent to a different machine in the cluster.

In [1]:
rdd = sc.parallelize([1,2,3,4,5])
rdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:364

In [2]:
rdd.getNumPartitions()

4

In [3]:
# See the partitions
rdd.glom().collect()

[[1], [2], [3], [4, 5]]

In this example, my machine has a 4-core CPU, so Spark has created 4 executors and broken up the list into 4 partitions.

Using `sc.parallelize`, you can turn a Python list, a NumPy array or a Pandas `Series` or `DataFrame` into a Spark RDD.

You can also read a text file into an RDD.  Each line is then a separate item.  A slight complication is that, by default, Spark will assume that paths point into the Hadoop Distributed File System (HDFS), which we'll discuss tomorrow.  To access files on your local machine, you need to give full path names starting with `file://`.

In [4]:
# Record current path for future use
import os
cwd = os.getcwd()
cwd

'/Users/pat/Work/2015/PythonTraining/4DayTrainingOpenSource'

In [5]:
# File from Pandas exercises
rdd = sc.textFile("file://" + cwd + "/names/yob1880.txt")
rdd

file:///Users/pat/Work/2015/PythonTraining/4DayTrainingOpenSource/names/yob1880.txt MappedRDD[3] at textFile at NativeMethodAccessorImpl.java:-2

In [6]:
rdd.first()

u'Mary,F,7065'

You can read a whole folder's worth of files.  In that case, each "item" in the RDD is a tuple with a file name and the whole contents of that file:

**Before running this, move the file NationalReadMe.pdf out of the names folder**

In [7]:
rdd = sc.wholeTextFiles("file://" + cwd + "/names")
rdd

org.apache.spark.api.java.JavaPairRDD@5e9a838d

In [8]:
rdd.first()

(u'file:/Users/pat/Work/2015/PythonTraining/4DayTrainingOpenSource/names/yob1880.txt',
 u'Mary,F,7065\r\nAnna,F,2604\r\nEmma,F,2003\r\nElizabeth,F,1939\r\nMinnie,F,1746\r\nMargaret,F,1578\r\nIda,F,1472\r\nAlice,F,1414\r\nBertha,F,1320\r\nSarah,F,1288\r\nAnnie,F,1258\r\nClara,F,1226\r\nElla,F,1156\r\nFlorence,F,1063\r\nCora,F,1045\r\nMartha,F,1040\r\nLaura,F,1012\r\nNellie,F,995\r\nGrace,F,982\r\nCarrie,F,949\r\nMaude,F,858\r\nMabel,F,808\r\nBessie,F,796\r\nJennie,F,793\r\nGertrude,F,787\r\nJulia,F,783\r\nHattie,F,769\r\nEdith,F,768\r\nMattie,F,704\r\nRose,F,700\r\nCatherine,F,688\r\nLillian,F,672\r\nAda,F,652\r\nLillie,F,647\r\nHelen,F,636\r\nJessie,F,635\r\nLouise,F,635\r\nEthel,F,633\r\nLula,F,621\r\nMyrtle,F,615\r\nEva,F,614\r\nFrances,F,605\r\nLena,F,603\r\nLucy,F,590\r\nEdna,F,588\r\nMaggie,F,582\r\nPearl,F,569\r\nDaisy,F,564\r\nFannie,F,560\r\nJosephine,F,544\r\nDora,F,524\r\nRosa,F,507\r\nKatherine,F,502\r\nAgnes,F,473\r\nMarie,F,471\r\nNora,F,471\r\nMay,F,462\r\nMamie,F,436\r\n

Other sources of data for an initial RDD include:
* Files in HDFS
* Database tables in Hive
* The result of Spark SQL queries

We'll talk about these tomorrow

**Ex 2.2.1 Load the file `names/yob1880.txt` into an RDD called `rdd`.  Then evaluate the expression `rdd.first()` to see the first line of the file and `rdd.count()` to count the number of lines in the file**

RDDs can be **transformed** into new RDDs in much the same way that list comprehensions transform one list into another.  Here are the transformations that apply to all RDDs:
* `map()` applies a function to each item in the RDD
* `flatMap()` applies a function to each item that returns a list, then joins all the lists together into a new RDD.
* `filter()` creates a new RDD including only the items that satisfy some condition.
* `distinct()` removes duplicates from an RDD
* `sample()` samples items from an RDD, with or without replacement
* `sortBy()` sorts the items in an RDD

So you can see the results, we'll also use an **action** called `collect()` which gathers all the items in an RDD into a Python list.  We'll discuss actions in a second.

Here are a few examples:

In [9]:
numbersRDD = sc.parallelize(range(1,10+1))
print(numbersRDD.collect())

squaresRDD = numbersRDD.map(lambda x: x**2)  # Square every number
print(squaresRDD.collect())

filteredRDD = numbersRDD.filter(lambda x: x % 2 == 0)  # Only the evens
print(filteredRDD.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
[2, 4, 6, 8, 10]


Here's an example where `flatMap()` is useful:

In [10]:
sentencesRDD = sc.parallelize(['Hello world', 'My name is Patrick'])
wordsRDD = sentencesRDD.flatMap(lambda sentence: sentence.split(" "))
print(wordsRDD.collect())
print(wordsRDD.count())

['Hello', 'world', 'My', 'name', 'is', 'Patrick']
6


The equivalent operation in Python looks something like this:

In [11]:
l = ['Hello world', 'My name is Patrick']
ll = []
for sentence in l:
    ll = ll + sentence.split(" ")
ll

['Hello', 'world', 'My', 'name', 'is', 'Patrick']

Transformations can be chained one after another.  Here's a more complicated example:

In [12]:
def doubleIfOdd(x):
    if x % 2 == 1:
        return 2 * x
    else:
        return x

resultRDD = (numbersRDD           # In parentheses so we can write each
             .map(doubleIfOdd)    # transformation in one line
             .filter(lambda x: x > 6)
             .distinct())

resultRDD.collect()

[8, 10, 18, 14]

**Ex 2.2.2 Using `sc.textFile()`, load `"names/yob1880.txt"` into an RDD.  Then using `map`, `filter` and `distinct`, create an RDD with all the names that start with the letter `M`**  
_Hint_: You may find it useful to look at the first item in any intermediate RDD using the `first()` action.

If you have two RDDs, the following methods can be used to combine the two RDDs as described:
* `rdd1.union(rdd2)`: all the items in `rdd1` and all the items in `rdd2`.
* `rdd1.intersection(rdd2)`: all the items in both `rdd1` and `rdd2`
* `rdd1.substract(rdd2)`: all the items in `rdd1` but not in `rdd2`
* `rdd1.cartesian(rdd2)`: all pairs of items `(x,y)` where `x` is in `rdd1` and `y` in `rdd2` (like an unconditional join in SQL)

Here are a few examples:

In [13]:
numbersRDD = sc.parallelize([1,2,3])
moreNumbersRDD = sc.parallelize([2,3,4])

In [14]:
numbersRDD.union(moreNumbersRDD).collect()

[1, 2, 3, 2, 3, 4]

In [15]:
numbersRDD.intersection(moreNumbersRDD).collect()

[2, 3]

In [16]:
numbersRDD.subtract(moreNumbersRDD).collect()

[1]

In [17]:
numbersRDD.cartesian(moreNumbersRDD).collect()

[(1, 2), (1, 3), (1, 4), (2, 2), (2, 3), (2, 4), (3, 2), (3, 3), (3, 4)]

**Note:** An essential concept in Spark is that of **lazy evaluation**.  When you transform one RDD into another, *the transformation is not performed immediately*!  Instead, Spark makes a note of the transformation for later execution.  When the time comes, Spark can then rearrange the calculation and avoid storing and communicating intermediate results unnecessarily.

**Actions** trigger an actual execution of all the transformations in an RDD and produce a result.  The most common actions are:
* `collect()`: Calculate all items in the RDD and send them back to the driver program.  `collect()` then returns them as a Python list.
* `first()`: Likewise, but only for the first item in an RDD.
* `take(n)`: Same, but return only first $n$ items.
* `count()`: Calculate the number of items in an RDD.
* `top(n)`: Return the top $n$ items, sorted by their natural ordering.
* `reduce()`: Aggregate items in an RDD according to sum commutative and associative operation.

We've used `collect()`, `first()` and `count()` before already.  Let's see `reduce()` in action.  Here's one way in Spark of adding all numbers from 1 to 10:

In [18]:
rdd = sc.parallelize(range(1,10+1))
rdd.reduce(lambda x, y: x + y)

55

Notice that the reduction can happen partially within each partition, then globally between the partial results of each partition.  In fact, Spark is free to aggregate the RDD items in any way it chooses.

Here are two possible realizations of the above reduction in a single node, to illustrate how the function actually gets applied:

In [19]:
def f(x,y):
    return x + y

l = [1,2,3,4]
f(f(f(l[0],l[1]), l[2]), l[3])

10

In [20]:
f(f(l[0], l[1]),  f(l[2], l[3]))

10

**Ex 2.2.3 Use `reduce()` to calculate 10! in Spark**

**Ex 2.2.4  Use `reduce()` to create a single string of all the names starting with M from Ex 2.2.2, with names separated by commas**

Whenever an action is executed, Spark will arrange for the executors to load the original data in the RDD and perform the entire sequence of transformations that defined the RDD.  But sometimes you need to use an RDD two or more times.  In that case, it pays to ask Spark to hang onto the actual result of the RDD (spread over the entire cluster) so that you don't necessarily repeat the same calculation two or more times.  To do that, you use `cache()`:

In [21]:
# Calculate the average of all the squares from 1 to 10
import numpy as np
numbersRDD = sc.parallelize(np.linspace(1.0, 10.0, 10))
squaresRDD = numbersRDD.map(lambda x: x**2)

squaresRDD.cache()  # Preserve the actual items of this RDD in memory

avg = squaresRDD.reduce(lambda x, y: x + y) / squaresRDD.count()
print(avg)

38.5


Caching RDD's is particularly useful for iterative calculations, like those that drive many machine learning algorithms.

**(!) Ex 2.2.5 Last week, we talked about Newton's algorithm for finding the square root of $n$ to a given precision $eps$.  Briefly:**
* **Make a guess $x = 1.0$.**
* **Either $x$ or $n / x$ are two big.  So improve the guess by using the average of these two numbers, i.e., $(x + n/x)/2$.**
* **Keep improving the guess until $x*x$ and $n$ differ by less than $eps$.**

**Write a Spark version of Newton's algorithm that calculates the square roots of all the numbers in an RDD.**  
_Hint_: If your RDD of $n$'s is called `ns`, it might help to initially transform it to an RDD of ($n$, $x$) tuples, as follows:
```
nsAndGuesses = ns.map(lambda n: (n, 1.0))
```

Note that your implementation of Newton's algorithm now works just as well with 3 numbers as it does with 5 trillion numbers.  Spark will distribute the calculation over as big a cluster as you want.  For large enough arguments, your calculations will complete 100 times faster on a 100-node cluster than they will on a single-core laptop.

---

`Spark` implements the above transformations and actions for all RDDs.  But for RDDs with more structure, `Spark` exposes more useful transformations and actions.  In particular, for so-called **pair RDDs**, where each item is a pair (a tuple of two items), `Spark` implements lots of grouping and reduction transformations and actions.  Here are the most common:
* `reduceByKey()`: apply a reduction to all the items with the same key
* `groupByKey()`: return an RDD of (key, listOfValues) tuples with a list of all the values sharing a given key
* `sortByKey()`: sort by keys :-D
* `countByKey()`: count the number of items containing each key
* `collectAsMap()`: like collect, but return a map instead of a list of (key, value) tuples.

Here's an example of counting words in Spark.  Counting words is a kind of "Hello world" of distributed computation.

In [22]:
rdd = sc.parallelize(["Hello hello", "Hello New York", "York says hello"])
resultRDD = (
    rdd
    .flatMap(lambda sentence: sentence.split(" "))  # split into words
    .map(lambda word: word.lower())                 # lowercase
    .map(lambda word: (word, 1))                    # count each appearance
    .reduceByKey(lambda x, y: x + y)                # add counts for each word
)
resultRDD.collect()

[('says', 1), ('new', 1), ('hello', 4), ('york', 2)]

We could collect the result as a map:

In [23]:
result = resultRDD.collectAsMap()
result

{'hello': 4, 'new': 1, 'says': 1, 'york': 2}

If we just wanted the top 2 words, we could do this:

In [24]:
print(resultRDD
      .sortBy(keyfunc=lambda (word, count): count, ascending=False)
      .take(2))

[('hello', 4), ('york', 2)]


**(!) Ex 2.2.6  Calculate the total number of births in the US with any given name, from 1950 to 2000 (inclusive) and report the 20 most common names**

Given two pair RDDs, we can join them based on keys as with SQL.  To illustrate, we create two simple pair RDDs:

In [25]:
# Home of different people
homesRDD = sc.parallelize([
        ('Brussels', 'John'),
        ('Brussels', 'Jack'),
        ('Leuven', 'Jane'),
        ('Antwerp', 'Jill'),
    ])

# Quality of life index for various cities
lifeQualityRDD = sc.parallelize([
        ('Brussels', 10),
        ('Antwerp', 7),
        ('RestOfFlanders', 5),
    ])

In [26]:
homesRDD.join(lifeQualityRDD).collect()

[('Antwerp', ('Jill', 7)),
 ('Brussels', ('John', 10)),
 ('Brussels', ('Jack', 10))]

In [27]:
homesRDD.leftOuterJoin(lifeQualityRDD).collect()

[('Antwerp', ('Jill', 7)),
 ('Brussels', ('John', 10)),
 ('Brussels', ('Jack', 10)),
 ('Leuven', ('Jane', None))]

In [28]:
homesRDD.rightOuterJoin(lifeQualityRDD).collect()

[('Antwerp', ('Jill', 7)),
 ('Brussels', ('John', 10)),
 ('Brussels', ('Jack', 10)),
 ('RestOfFlanders', (None, 5))]

In [29]:
homesRDD.cogroup(lifeQualityRDD).collect()

[('Antwerp',
  (<pyspark.resultiterable.ResultIterable at 0x104b716d0>,
   <pyspark.resultiterable.ResultIterable at 0x104b71790>)),
 ('Brussels',
  (<pyspark.resultiterable.ResultIterable at 0x104b71650>,
   <pyspark.resultiterable.ResultIterable at 0x104b71750>)),
 ('Leuven',
  (<pyspark.resultiterable.ResultIterable at 0x104b714d0>,
   <pyspark.resultiterable.ResultIterable at 0x104b71550>)),
 ('RestOfFlanders',
  (<pyspark.resultiterable.ResultIterable at 0x104b717d0>,
   <pyspark.resultiterable.ResultIterable at 0x105bfc450>))]

In [30]:
# Oops!  Those <ResultIterable>s are Spark's way of returning a list
# that we can walk over, without materializing the list.
# Let's materialize the lists to make the above more readable:
(homesRDD
 .cogroup(lifeQualityRDD)
 .map(lambda (home, (allPeople, allQualities)):
        (home, (list(allPeople), list(allQualities))))
 .collect())

[('Antwerp', (['Jill'], [7])),
 ('Brussels', (['John', 'Jack'], [10])),
 ('Leuven', (['Jane'], [])),
 ('RestOfFlanders', ([], [5]))]

As you can see, `cogroup` is a kind of "super-join".  For each key, it gives you access to every corresponding value in each of the two RDDs.  You can build all other joins out of the result of `cogroup`.

---

Our lightning tour of Spark's core API should give you a rough idea of how to express distributed computations in Spark.  If you want to learn more, do take a look at the [Learning Spark](http://shop.oreilly.com/product/0636920028512.do) book.

Note that by writing the calculations as pipelines of transformations that can be sent to the data, the computations will distribute across a cluster of any size.  Given enough resources, the code snippets that you've written today will work equally well on datasets that are terabytes in size!

In the following sections, we'll look at two higher-order abstractions to Spark: `Spark SQL` and `MLLib`.

---

### Lunch break