# Introduction to Spark

In this notebook, you will be introduced to the Apache Spark libary for big data processing. In order to use this notebook, you must openit with `pyspark` rather than `ipython` or `jupyter`. `pyspark` will automatically load some of the Spark internals for you. In particular, the variable `sc` is a Spark context. Check that it is correctly initialised:

In [1]:
print sc
print "Ready to go!"

<pyspark.context.SparkContext object at 0x7ff6921d1990>
Ready to go!


You will use `matplotlib` and `numpy` to diplay summary graphs of the datasets you are analysing. Load these libraries.

In [2]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline



### Learning activity: Create RDDs

To analyse large datasets using Spark you will load them into Resilient Distributed Datasets (RDDs). There are a number of ways in which you can create RDDs. Use the `parallelize()` function to create one from a Python collection, and use the `textFile()` function to create an RDD from the file `war-and-peace.txt`. 

In [3]:
rdd = sc.parallelize([1,2,3,4])
print rdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475


In [4]:
lines = sc.textFile("war-and-peace.txt")

### Learning activity: Basic RDD manipulation

Print the number of lines in War and Peace, and the first 15 lines using the functions `count()` and `take()`.

In [5]:
print "War and Peace has %d lines." % lines.count()

War and Peace has 54223 lines.


In [6]:
for line in lines.take(15):
    print line

                                      1869
                                 WAR AND PEACE
                                 by Leo Tolstoy
BK1
                                 BOOK ONE: 1805
BK1|CH1
  CHAPTER I

  "Well, Prince, so Genoa and Lucca are now just family estates of the
Buonapartes. But I warn you, if you don't tell me that this means war,
if you still try to defend the infamies and horrors perpetrated by
that Antichrist- I really believe he is Antichrist- I will have
nothing more to do with you and you are no longer my friend, no longer
my 'faithful slave,' as you call yourself! But how do you do? I see
I have frightened you- sit down and tell me all the news."


### Learning activity: `filter()` and `map()` and `distinct()`

Lets apply some transformations onto RDDs. The following helper function will be useful to select the words from a line.

In [7]:
# A helper function to compute the list of words in a line of text
import re
def get_words(line):
    return re.compile('\w+').findall(line)

print get_words("This, is a test!")

['This', 'is', 'a', 'test']


Use `filter()` to count the number of lines which mention `war` and the number of lines which mention `peace`.

In [8]:
# How often are war and peace mentioned?
warLines = lines.filter(lambda line: "war" in get_words(line))
print "There are %d lines mentioning war." % warLines.count()

peaceLines = lines.filter(lambda line: "peace" in get_words(line))
print "There are %d lines mentioning peace." % peaceLines.count()

There are 265 lines mentioning war.
There are 104 lines mentioning peace.


Use `map()` to capitalise each line in the RDD, and print the first 15 capitalized lines.

In [9]:
# Capitalize each line in the RDD
upperLines = lines.map(lambda line: line.upper())
for line in upperLines.take(15):
    print line

                                      1869
                                 WAR AND PEACE
                                 BY LEO TOLSTOY
BK1
                                 BOOK ONE: 1805
BK1|CH1
  CHAPTER I

  "WELL, PRINCE, SO GENOA AND LUCCA ARE NOW JUST FAMILY ESTATES OF THE
BUONAPARTES. BUT I WARN YOU, IF YOU DON'T TELL ME THAT THIS MEANS WAR,
IF YOU STILL TRY TO DEFEND THE INFAMIES AND HORRORS PERPETRATED BY
THAT ANTICHRIST- I REALLY BELIEVE HE IS ANTICHRIST- I WILL HAVE
NOTHING MORE TO DO WITH YOU AND YOU ARE NO LONGER MY FRIEND, NO LONGER
MY 'FAITHFUL SLAVE,' AS YOU CALL YOURSELF! BUT HOW DO YOU DO? I SEE
I HAVE FRIGHTENED YOU- SIT DOWN AND TELL ME ALL THE NEWS."


Use `flatMap()` to create an RDD of the words in War and Peace and count the number of words.

In [10]:
# Split each line into words using get_words()
words = lines.flatMap(lambda line: get_words(line))
print "There are %d words in War and Peace." % words.count()

There are 573322 words in War and Peace.


Finally, use `distinct()` to count the number of different words in the RDD.

In [11]:
# Count the number of distinct words
vocabulary = words.distinct()
print "There are %d distinct words in War and Peace." % vocabulary.count() 

There are 19206 distinct words in War and Peace.


### Learning activity: Set like transformations

Use the functions `union()` and `intersection()` to create RDDs of lines with either war or peace mentioned, and both war and peace being mentioned. Count how many lines of each type there are and print some examples.

In [12]:
warLines = lines.filter(lambda line: "war" in get_words(line))
peaceLines = lines.filter(lambda line: "peace" in get_words(line))

warAndPeaceLines = warLines.intersection(peaceLines)
warOrPeaceLines = warLines.union(peaceLines)

print "There are %d lines with either war or peace, some of them are:" % warOrPeaceLines.count()
for line in warOrPeaceLines.take(5):
    print line
    
print "------------------------------------------"

print "There are %d lines with both war and peace, some of them are:" % warAndPeaceLines.count()
for line in warAndPeaceLines.take(5):
    print line

There are 369 lines with either war or peace, some of them are:
Buonapartes. But I warn you, if you don't tell me that this means war,
things, but Austria never has wished, and does not wish, for war.
to get himself killed. Tell me what this wretched war is for?" she
  "You are off to the war, Prince?" said Anna Pavlovna.
tell you. There is a war now against Napoleon. If it were a war for
------------------------------------------
There are 7 lines with both war and peace, some of them are:
blamed," he said, "both for that war and the peace... but everything
peace nor war, neither an advance nor a defensive camp at the Drissa
perpetual peace and the abolition of war, and secondly, by the fact
war and the peace that had been concluded. "Yes, I have been much
  "To enter Russia without declaring war! I will not make peace as


### Learning activity: `reduce()`

You have already seen three actions: `collect()` which returns all elements in the RDD, `take(n)`, which return the first `n` elements of the RDD, and `count()` which returns the number of elements in the RDD.

The action `reduce()` takes as input a function which collapses two elements into one. Use it to find the longest word in War and Peace.

In [13]:
print "The longest word in war and peace is:"
print words.reduce(lambda x, y: x if len(x)>len(y) else y)

The longest word in war and peace is:
characteristically


### Bonus activity: merging filters

Find all the lines that mention both war and peace **without** using `intersection()`.

In [14]:
warAndPeaceLines = lines.filter(lambda x: "war" in get_words(x) and "peace" in get_words(x))
warAndPeaceLines.count()

7

### Bonus activity: Finding proper nouns

The Python function `str.istitle()` returns `True` if the string `str` is titlecased: the first character is uppercase and others are lowercase. Use it to:
* Find the set of distinct words in War and Peace which are titlecased
* Find the set of distinct words in War and Peace which are not titlecased

In [10]:
titled_words = words.filter(lambda word: word.istitle()).distinct()
untitled_words = words.filter(lambda word: not word.istitle()).distinct()

The Python function `str.lower` returns a string with all characters of `str` lowercase. Use it, along with your previously generated RDD to find the set of words in War and Peace which only appear titlecased.

In [11]:
lower_titles = titled_words.map(lambda word: word.lower())
proper_nouns = lower_titles.subtract(untitled_words)

In [12]:
proper_nouns.count()

1548

In [13]:
proper_nouns.take(50)

[u'bennigsens',
 u'gwiska',
 u'melyukova',
 u'ilagin',
 u'hosjeradek',
 u'femgalka',
 u'jacob',
 u'karagina',
 u'maksim',
 u'nostitz',
 u'bolotnoe',
 u'majesties',
 u'shinshina',
 u'paris',
 u'joshua',
 u'poltava',
 u'shinshin',
 u'nikitski',
 u'adonai',
 u'india',
 u'gibrard',
 u'vrbna',
 u'turk',
 u'frenchies',
 u'grabern',
 u'voyna',
 u'bonapartist',
 u'sardinian',
 u'london',
 u'yaroslavl',
 u'ecka',
 u'moyka',
 u'ignatovich',
 u'nativity',
 u'diana',
 u'fedeshon',
 u'vrazhek',
 u'collector',
 u'peterkin',
 u'sukhtelen',
 u'chatrov',
 u'kashmir',
 u'buonapartists',
 u'cossack',
 u'stael',
 u'nizhni',
 u'platoche',
 u'podolian',
 u'quay',
 u'margaux']

### Learning activity: Partitions

Load War and peace using a minimum of four partitions and use `getNumPartitions()` to show the resulting number used.

In [14]:
lines = sc.textFile("war-and-peace.txt", minPartitions=4)
print lines.getNumPartitions()

4


# Key/Value pairs in Spark

### Learning activity: WordCount in Spark

Use the functions `flatMap()` and `reduceByKey()` to count the number of occurences of each word in War and Peace, and print the count of five words.

In [16]:
words = lines.flatMap(lambda line: get_words(line))
word_pairs = words.map(lambda word: (word, 1))
word_counts = word_pairs.reduceByKey(lambda c1, c2: c1 + c2)
for wc in word_counts.take(5):
    print 'The word "%s" was mentioned %d times.' % (wc[0], wc[1])

The word "aided" was mentioned 1 times.
The word "Letting" was mentioned 1 times.
The word "Saints" was mentioned 1 times.
The word "divinely" was mentioned 1 times.
The word "images" was mentioned 3 times.


### Learning activity: using `groupByKey()`

Reimplement the above word count using `groupByKey()` instead of `reduceByKey()`

### Learning activity: computing the average of each key

The pair RDD defined below `word_line_pairs` has an element for each line in War and Peace with as key the first word, and as value the line itself. Use it to compute the average length of each line for each starting word.

In [18]:
word_lines_pairs = lines.map(lambda l: (get_words(l)[0], l))

# Spark in Practice

### Learning activity: Use an accumulator
Reimplement the definition of the `word` (re-expressed below) RDD to count blank lines as you process it.

In [None]:
# Old definition of words
words = lines.flatMap(lambda line: get_words(line))

### Learning activity: Implement the same program with an action

### Learning activity: compare the amount of data shuffled with SparkUI
Using Spark UI, compare the amount of data shuffled by each of the following two (equivalent) computations:

In [17]:
distinct_words = words.distinct()
short_distinct_words = distinct_words.filter(lambda x: len(x)<6)
short_distinct_words.count()

4536

In [18]:
short_words = words.filter(lambda x: len(x)<6)
short_distinct_words = short_words.distinct()
short_distinct_words.count()

4536

Similarly, re-execute the two versions of word count you implemented before and compare the amount of data shuffled using `reduceByKey()` and using `groupByKey()`.