Introduction to Spark 01
====

This lecture is an introduction to the Spark framework for distributed computing, the basic data and control flow abstractions, and getting comfortable with the functional programming style needed to writte a Spark application.

- What problem does Spark solve?
- SparkContext and the master configuration
- RDDs
- Actions
- Transforms
- Key-value RDDs
- Example - word count
- Persistence
- Merging key-value RDDs

Resources
----

- [Spark Programming Guide](http://spark.apache.org/docs/latest/programming-guide.html)


Architecture of a Spark Application
----

![Spark components](http://spark.apache.org/docs/latest/img/cluster-overview.png)

SparkContext
----

A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. Here we set it up to use local nodes - the argument `locals[*]` means to use the local machine as the cluster, using as many worker threads as there are cores. You can also explicitly set the number of cores with `locals[k]` where `k` is an integer.

In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local[*]')

In [2]:
sc.defaultParallelism

4

In [3]:
sc.defaultMinPartitions

2

### Creating an RDD

The RDD (Resilient Distributed Dataset) is a data storage abstraction - you can work with it as though it were single unit, while it may actually be distributed over many nodes in the computing cluster.

#### A first example

Distribute the data set to the workers

In [4]:
xs = sc.parallelize(range(10))
xs

PythonRDD[1] at RDD at PythonRDD.scala:43

In [5]:
xs.glom().collect()

[[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]

Only keep even numbers

In [6]:
xs = xs.filter(lambda x: x % 2 == 0)
xs

PythonRDD[3] at RDD at PythonRDD.scala:43

Square all elements

In [7]:
xs = xs.map(lambda x: x**2, xs)
xs

PythonRDD[4] at RDD at PythonRDD.scala:43

Executee the code and return the final dataset

In [8]:
xs.collect()

[0, 4, 16, 36, 64]

#### A common Spark idiom chains mutiple functions together

In [9]:
(
    sc.parallelize(range(10))
    .filter(lambda x: x % 2 == 0)
    .map(lambda x: x**2)
    .collect()
)

[0, 4, 16, 36, 64]

Actions and transforms
----

A **transform** maps an RDD to another RDD - it is a lazy operation. To actually perform any work, we need to apply an **action**.

### Actions

In [10]:
import numpy as np

In [11]:
x = sc.parallelize(np.random.randint(1, 6, 10))

In [12]:
x.collect()

[1, 4, 2, 5, 1, 5, 5, 4, 2, 1]

In [13]:
x.take(5)

[1, 4, 2, 5, 1]

In [14]:
x.first()

1

In [15]:
x.top(5)

[5, 5, 5, 4, 4]

In [16]:
x.takeSample(True, 15)

[1, 5, 4, 5, 4, 1, 2, 4, 5, 5, 4, 2, 1, 2, 5]

In [17]:
x.count()

10

In [18]:
x.countByValue()

defaultdict(int, {1: 3, 2: 2, 4: 2, 5: 3})

In [19]:
x.sum()

30

In [20]:
x.max()

5

In [21]:
x.mean()

3.0

In [22]:
x.stats()

(count: 10, mean: 3.0, stdev: 1.67332005307, max: 5.0, min: 1.0)

### Saving RDDs

In [23]:
import os
import shutil
if os.path.exists('data//x'):
    shutil.rmtree('data/x')
x.saveAsTextFile('data/x')

In [24]:
!ls data/x

part-00000  part-00001	part-00002  part-00003	_SUCCESS


In [25]:
!cat data/x/*

1
4
2
5
1
5
5
4
2
1


#### Fold, redcue and aggregate actions

**max** using reduce

In [26]:
x.reduce(lambda x, y: x if x > y else y)

5

**sum** using `reduce`

In [27]:
x.reduce(lambda x, y: x+y)

30

**sum** using fold

In [28]:
x.fold(0, lambda x, y: x+y)

30

**prod** using reduce

In [29]:
x.reduce(lambda x, y: x*y)

8000

**prod** using fold

In [30]:
x.fold(1, lambda x, y: x*y)

8000

**sum** using aggregate

In [31]:
x.aggregate(0, lambda x, y: x + y, lambda x, y: x + y)

30

**count** using aggregate

In [32]:
x.aggregate(0, lambda acc, _: acc + 1, lambda x, y: x+y)

10

**mean** using aggregate

In [33]:
sum_count = x.aggregate([0,0], 
                        lambda acc, x: (acc[0]+x, acc[1]+1), 
                        lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1]+ acc2[1]))
sum_count[0]/sum_count[1]

3.0

### Transforms

In [34]:
x = sc.parallelize([1,2,3,4])
y = sc.parallelize([3,3,4,6])

In [35]:
x.map(lambda x: x + 1).collect()

[2, 3, 4, 5]

In [36]:
x.filter(lambda x: x%3 == 0).collect()

[3]

#### Think of flatMap as a map followed by a flatten operation

In [37]:
x.flatMap(lambda x: range(x-2, x)).collect()

[-1, 0, 0, 1, 1, 2, 2, 3]

In [38]:
x.sample(False, 0.5).collect()

[3]

#### Set-like transformss

In [39]:
y.distinct().collect()

[4, 6, 3]

In [40]:
x.union(y).collect()

[1, 2, 3, 4, 3, 3, 4, 6]

In [41]:
x.intersection(y).collect()

[3, 4]

In [42]:
x.subtract(y).collect()

[1, 2]

In [43]:
x.cartesian(y).collect()

[(1, 3),
 (1, 3),
 (1, 4),
 (1, 6),
 (2, 3),
 (2, 3),
 (2, 4),
 (2, 6),
 (3, 3),
 (3, 3),
 (3, 4),
 (3, 6),
 (4, 3),
 (4, 3),
 (4, 4),
 (4, 6)]

Working with key-value pairs
----

RDDs consissting of key-value pairs are required for many Spark operatinos. They can be created by using a function that returns an RDD composed of tuples.

In [44]:
data = [('ann', 1), ('bob', 2)]

In [45]:
rdd = sc.parallelize(data)

In [46]:
rdd.keys().collect()

['ann', 'bob']

### Using key-value pairs to find most frequent words in Ulysses

In [47]:
ulysses = sc.textFile('data/Ulysses.txt')

In [48]:
ulysses.take(10)

['The Project Gutenberg EBook of Ulysses, by James Joyce',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org',
 '',
 '',
 'Title: Ulysses',
 '']

In [49]:
import string
def tokenize(line):
    table = dict.fromkeys(map(ord, string.punctuation))
    return line.translate(table).lower().split()

In [50]:
words = ulysses.flatMap(lambda line: tokenize(line))
words.take(10)

['the',
 'project',
 'gutenberg',
 'ebook',
 'of',
 'ulysses',
 'by',
 'james',
 'joyce',
 'this']

In [51]:
words = words.map(lambda x: (x, 1))
words.take(10)

[('the', 1),
 ('project', 1),
 ('gutenberg', 1),
 ('ebook', 1),
 ('of', 1),
 ('ulysses', 1),
 ('by', 1),
 ('james', 1),
 ('joyce', 1),
 ('this', 1)]

In [52]:
counts = words.reduceByKey(lambda x, y: x+y)
counts.take(10)

[('kyries', 1),
 ('mobile', 2),
 ('gasteropod', 1),
 ('circle', 20),
 ('calamitous', 1),
 ('kneecap', 1),
 ('divers', 6),
 ('riotously', 1),
 ('cookies', 1),
 ('temptations', 1)]

In [53]:
counts.takeOrdered(10, key=lambda x: -x[1])

[('the', 15107),
 ('of', 8257),
 ('and', 7282),
 ('a', 6553),
 ('to', 5042),
 ('in', 4981),
 ('he', 4033),
 ('his', 3333),
 ('i', 2698),
 ('that', 2621)]

### Word count chained version

In [54]:
(
ulysses.flatMap(lambda line: tokenize(line))
                .map(lambda word: (word, 1))
               .reduceByKey(lambda x, y: x + y)
               .takeOrdered(10, key=lambda x: -x[1])
)

[('the', 15107),
 ('of', 8257),
 ('and', 7282),
 ('a', 6553),
 ('to', 5042),
 ('in', 4981),
 ('he', 4033),
 ('his', 3333),
 ('i', 2698),
 ('that', 2621)]

Persisting data
----

The `top_word` program will repeat ALL the computations each time we take an action such as `takeOrdered`. We need to `persist` or `cahce` the results - they are similar except that `persist` gives more control over how the data is retained.

In [55]:
counts.is_cached

False

In [56]:
counts.persist()

PythonRDD[78] at RDD at PythonRDD.scala:43

In [57]:
counts.is_cached

True

In [58]:
counts.takeOrdered(5, lambda x: -x[1])

[('the', 15107), ('of', 8257), ('and', 7282), ('a', 6553), ('to', 5042)]

In [59]:
counts.take(5)

[('kyries', 1),
 ('mobile', 2),
 ('gasteropod', 1),
 ('circle', 20),
 ('calamitous', 1)]

In [60]:
counts.takeOrdered(5, lambda x: x[0])

[('0', 2), ('001', 5), ('002', 1), ('003', 2), ('004', 3)]

In [61]:
counts.keys().take(5)

['kyries', 'mobile', 'gasteropod', 'circle', 'calamitous']

In [62]:
counts.values().take(5)

[1, 2, 1, 20, 1]

In [63]:
count_dict = counts.collectAsMap()
count_dict['circle']

20

#### Using cacche instead of persist

In [64]:
counts.unpersist()

PythonRDD[78] at RDD at PythonRDD.scala:43

In [65]:
counts.is_cached

False

In [66]:
counts.cache()

PythonRDD[78] at RDD at PythonRDD.scala:43

In [67]:
counts.is_cached

True

### Merging key, value datasets

We will build a second counts key: value RDD from another of Joyce's works - Portrait of the Artist as a Young Man.

In [68]:
portrait = sc.textFile('data/Portrait.txt')

In [69]:
counts1 = (
portrait.flatMap(lambda line: tokenize(line))
        .map(lambda x: (x, 1))
        .reduceByKey(lambda x,y: x+y)
)

In [70]:
counts1.persist()

PythonRDD[90] at RDD at PythonRDD.scala:43

#### Combine counts for words found in both books

In [71]:
joined = counts.join(counts1)

In [72]:
joined.take(5)

[('mobile', (2, 1)),
 ('circle', (20, 1)),
 ('temptations', (1, 4)),
 ('withering', (4, 1)),
 ('spoken', (16, 15))]

#### sum counts over words

In [73]:
s = joined.mapValues(lambda x: x[0] + x[1])
s.take(5)

[('mobile', 3),
 ('circle', 21),
 ('temptations', 5),
 ('withering', 5),
 ('spoken', 31)]

#### average counts across books

In [74]:
avg = joined.mapValues(lambda x: np.mean(x))
avg.take(5)

[('mobile', 1.5),
 ('circle', 10.5),
 ('temptations', 2.5),
 ('withering', 2.5),
 ('spoken', 15.5)]

### Version Information

In [75]:
%load_ext version_information
%version_information numpy, pyspark

Software,Version
Python,3.5.1 64bit [GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]
IPython,4.0.1
OS,Linux 4.2.0 23 generic x86_64 with debian jessie sid
numpy,1.10.2
pyspark,The 'pyspark' distribution was not found and is required by the application
Mon Jan 18 19:46:47 2016 EST,Mon Jan 18 19:46:47 2016 EST
