#Simple Spark operations

##Requirements

* This notebook requires a local installation of [Apache Spark](https://spark.apache.org/) (version 1.2 was used here)

##Load a text file and count the lines

There are a couple of sample text files (from Project Gutenberg http://www.gutenberg.org/) in the /data directory in this repository.

In [1]:
# Sample data files (you could use any text file)
shakespeare = "./data/shakespeare-works.txt"
aurelius = "./data/marcus-aurelius-meditations.txt"

# Load one of these files
fileName = shakespeare

textFile = sc.textFile(fileName)

* Count the lines in the file. 
* NB: This action will also force the RDD to materialise the data i.e. Spark will read the file at this point.

In [2]:
sc

<pyspark.context.SparkContext at 0x108a0f3d0>

In [3]:
print("File {0} contains {1} lines".format(fileName, textFile.count()))

File ./data/shakespeare-works.txt contains 124787 lines


* Takes the length of each line in the file and find the maximum line length.
* Again, this action forces Spark to read the file.

In [4]:
maxlen = textFile.map(lambda s: len(s)).max()
print("Maximum line length is {0}".format(maxlen))

Maximum line length is 85


##Create a simple Python function to count words in a string

* We will use these functions when we process the Spark RDDs below.
* The `extract_words()` function is pretty basic and we could probably refine this with a better regex, making it case-insensitive etc.

In [5]:
# Import regular expression library
import re

def extract_words(s):
    """Split input string into words using RE"""
    return re.split('\W+', s)

def count_words(s):
    """Split input string into words using RE and return count"""
    words = extract_words(s)
    return len(words)

Test the count_words function with a simple string:

In [6]:
s = "why hello there, Old old chap"
print("String '{0}' contains {1} words".format(s, count_words(s)))

String 'why hello there, Old old chap' contains 6 words


##Use your custom Python function with your Spark RDD

Now we can use this custom function in the standard RDD map() function:

In [7]:
wordsPerLine = textFile.map(lambda s: count_words(s))

In [8]:
print("Maximum number of words in a line is: {0}".format(wordsPerLine.max()))

Maximum number of words in a line is: 19


##Do some classic word-count stuff

* Each line in the text file is read and converted into a list of words.
* The word-lists for the separate lines are then *flattened* into a single list of words.
* We then do the classic word-count **`map`** operation i.e. spit out a pair of (word, 1) for each word in the list.
* Now we **`reduce`** the list of (word, 1) tuples, grouping them by the key (word), and adding up all the 1's for each word.
* This gives as a **tuple of (word, count)** for each distinct word in the original text.

In [9]:
# Generate word counts as another RDD
countsRDD = textFile.flatMap(lambda line: extract_words(line.lower())) \
            .filter(lambda word: len(word)>0) \
            .map(lambda word: (word, 1)) \
            .reduceByKey(lambda a, b: a + b) 

* We might choose to cache this RDD in Spark, so that we can re-use the RDD in different places later on without having to re-generate the data.

In [10]:
countsRDD.cache()

PythonRDD[10] at RDD at PythonRDD.scala:42

###Find N most common words in text

* Spark likes working with [key-value pairs](http://spark.apache.org/docs/1.2.0/programming-guide.html#working-with-key-value-pairs) and our tuples are (key, value) pairs of **(word, count)**, i.e. the key is **word**.
* But we now want to sort them in order of **count** to get the most/least common words.
* The easy way to do this is to flip the tuples around as (count, word), then sort them by their key (count).
* Then we just `take` the required number of items off the sorted list.

In [11]:
# Find N most common words by count
n = 10
mostcommon = countsRDD \
           .map(lambda (w,c):(c,w)) \
           .sortByKey(ascending=False) \
           .take(n)

In [12]:
print("Most common {0} words\n====================".format(n))
for (c, w) in mostcommon:
    print("{0} : {1}".format(w, c))

Most common 10 words
the : 27843
and : 26847
i : 22538
to : 19882
of : 18307
a : 14800
you : 13928
my : 12490
that : 11563
in : 11183


###Find N least common words in text

* Same principle as above.

In [13]:
# Find N least common words by count
n = 10
leastcommon = countsRDD \
           .map(lambda (w,c):(c,w)) \
           .sortByKey(ascending=True) \
           .take(n)

In [55]:
print("Least common {0} words\n=====================".format(n))
for (c, w) in leastcommon:
    print("{0} : {1}".format(w, c))

Least common 10 words
aided : 1
gag : 1
cxsar : 1
pretended : 1
conjuring : 1
offendeth : 1
reposeth : 1
rupture : 1
swoopstake : 1
digit : 1


##Clean-up

In [56]:
# Release the cache:
countsRDD.unpersist()

PythonRDD[97] at RDD at PythonRDD.scala:43

#*That's all, folks!*