
# **Spark's Hello World - WordCount **
#### This lab will demonstrate how to count the number of words is a document using Spark's transformations and actions.

#### *Part 1:*  Creating an RDD from a Python List
####                 Use the map transformation to find the length of each word
#### *Part 2:*  Create a pair RDD for each element of the list ( ie [ ("dog",1) , ("cat", 1)  ....    ("elepant",1)].  Each element is of the form 
             (key,value)
#### *Part 3:*  Aggregate by key using GroupByKey - ("dog", [1,1])  ("elephant", [1,1]  ...  then count using reduce ...
#### *Part 4:*  ReduceByKey - Aggregate and count at the same time
#### *Part 5:* Apply word count to a file


# ** Part 1: Creating an RDD from a Python List **

In [2]:
#create a Python List and print the type
wordsList = ['cat', 'elephant', 'dog', 'dog', 'cat', 'bird', 'elephant']
print type(wordsList)


<type 'list'>


###sc.parallelize creates an RDD spreading the data in wordsList across 4 executors


In [3]:
#Create an RDD on 4 executors
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print type(wordsRDD)

<class 'pyspark.rdd.RDD'>


###Collect and Print are actions - data is sent back to the driver!!!


In [4]:
# Collect is an action that returns the elements of the RDDs to the driver.  Count is also an action
wordList = wordsRDD.collect()
print wordList

wordCount = wordsRDD.count()
print wordCount

['cat', 'elephant', 'dog', 'dog', 'cat', 'bird', 'elephant']
7


In [5]:
# TEST your code
from test_helper import Test
Test.assertEquals(wordsRDD.collect(), ['cat', 'elephant', 'dog', 'dog', 'cat', 'bird', 'elephant'],
                  'incorrect values for wordsRDD')
Test.assertEquals(wordsRDD.count(), 7)

1 test passed.
1 test passed.


#  **Map is a transformation - executed on the 4 individual executors**



#  **Part 2  :  Create a pair RDD for each element of the list **
#      **     [ ("dog",1) , #("cat", 1)  ....    ("elepant",1) **
#   **Each element is of the form   `<key,value>**

             
             
##  **  Here we use the `map` transformation to map each word to a Python tuple **             


In [6]:
wordPairs = wordsRDD.map(lambda word: (word, 1)) 

print wordPairs.collect()

[('cat', 1), ('elephant', 1), ('dog', 1), ('dog', 1), ('cat', 1), ('bird', 1), ('elephant', 1)]


In [7]:
# TEST Pair RDDs (1f)
Test.assertEquals(wordPairs.collect(),
                  [('cat', 1), ('elephant', 1), ('dog', 1), ('dog', 1), ('cat', 1), ('bird', 1), ('elephant', 1)],
                  'incorrect value for wordPairs')

1 test passed.


# ** Counting with pair RDDs **

#  **Part 3:  Aggregate by key using GroupByKey - ("dog", [1,1])  ("elephant", [1,1])          ie `('word', iterator)` ** 
## **                                   ...  then sum the values of the list using mapValues(list)  **
####                                      note that `list` is a Python function that creates a list of the values 

### `groupByKey()`  is a transformation that shuffles data across the executorsto generate a pair RDD of type `('word', iterator)`.

In [8]:
# TODO: Replace <FILL IN> with appropriate code
# Note that groupByKey requires no parameters
wordsGrouped = wordPairs.groupByKey().mapValues(list)
print wordsGrouped.collect()

# OR


for key, value in wordsGrouped.collect():
    print '{0}: {1}'.format(key, list(value))

[('dog', [1, 1]), ('bird', [1]), ('elephant', [1, 1]), ('cat', [1, 1])]
dog: [1, 1]
bird: [1]
elephant: [1, 1]
cat: [1, 1]


In [9]:
# TEST groupByKey() approach (2a)
Test.assertEquals(wordsGrouped.collect(),
                  [('dog', [1, 1]), ('bird', [1]), ('elephant', [1, 1]), ('cat', [1, 1])],
                  'incorrect value for wordsGrouped')

1 test passed.


In [10]:
# TODO: Replace <FILL IN> with appropriate code
wordCountsGrouped = wordsGrouped.map(lambda (k, v): (k, sum(v)))
print wordCountsGrouped.collect()



[('dog', 2), ('bird', 1), ('elephant', 2), ('cat', 2)]


In [11]:
# TEST Use groupByKey() to obtain the counts (2b)
Test.assertEquals(wordCountsGrouped.collect(),
                  [('dog', 2), ('bird', 1), ('elephant', 2), ('cat', 2)],
                  'incorrect value for wordCountsGrouped')

1 test passed.


# **Part 4:  ReduceByKey - Aggregate and count at the same time**
#### This transformation aggregates and counts first on the individual executors, then shuffles the results.  It scales much better than
#### groupByKey() for large data sets

In [12]:
# Note that reduceByKey takes in a function that accepts two values and returns a single value
wordCounts = wordPairs.reduceByKey(lambda a, b: a + b)
print wordCounts.collect()

[('dog', 2), ('bird', 1), ('elephant', 2), ('cat', 2)]


In [13]:
# TEST Counting using reduceByKey (2c)
Test.assertEquals(wordCounts.collect(), [('dog', 2), ('bird', 1), ('elephant', 2), ('cat', 2)],
                  'incorrect value for wordCounts')

1 test passed.


## **  All together **


In [17]:

wordCountsCollected = (wordsRDD
                       .map(lambda word: (word, 1)) 
                       .reduceByKey(lambda a, b: a + b)
                       .collect())
print wordCountsCollected

[('dog', 2), ('bird', 1), ('elephant', 2), ('cat', 2)]


In [18]:
# TEST All together (2d)
Test.assertEquals(wordCountsCollected, [('dog', 2), ('bird', 1), ('elephant', 2), ('cat', 2)],
                  'incorrect value for wordCountsCollected')

1 test passed.


### ** Part 5: Apply word count to a file **

# **Part 5:  Apply WordCount to a file **
#### The Shakespeare file has all punctuation removed and all empty lines removed

#### ** Load a text file **
#### This file has been loaded from [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page). 


In [19]:
# Just run this code
import os.path
baseDir = os.path.join('data')
fileName = os.path.join(baseDir, 'cleanShakespeare.txt')

shakespeareRDD = (sc
                  .textFile(fileName, 8))
                  

#append line numbers for easy viewing

print '\n'.join(shakespeareRDD
                .zipWithIndex()  # to (line, lineNum)
                .map(lambda (l, num): '{0}: {1}'.format(num, l))  # to 'lineNum: line'
                .take(15))


0: 1609
1: the sonnets
2: by william shakespeare
3: 1
4: from fairest creatures we desire increase
5: that thereby beautys rose might never die
6: but as the riper should by time decease
7: his tender heir might bear his memory
8: but thou contracted to thine own bright eyes
9: feedst thy lights flame with selfsubstantial fuel
10: making a famine where abundance lies
11: thy self thy foe to thy sweet self too cruel
12: thou that art now the worlds fresh ornament
13: and only herald to the gaudy spring
14: within thine own bud buriest thy content


#### **  Words from lines **
#### First split the words off each line - split(" ")



In [20]:
# TODO: Replace <FILL IN> with appropriate code

shakespeareWordsRDD = shakespeareRDD.flatMap(lambda line: line.split(" "))

#  Count the number of words
shakespeareWordCount = shakespeareWordsRDD.count()
print shakespeareWordsRDD.top(5)
print shakespeareWordCount

[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds']
918138


In [None]:
# TEST Words from lines (4d)
Test.assertEquals(shakespeareWordCount, 918138 , 'incorrect value for shakespeareWordCount')
Test.assertEquals(shakespeareWordsRDD.top(5),
                  [u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds'],
                  'incorrect value for shakespeareWordsRDD')

#### **  Remove empty elements **
#### The next step is to filter out the empty elements.  Remove all entries where the word is `''`.

In [21]:
# TODO: Replace <FILL IN> with appropriate code
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x != "")
shakeWordCount = shakeWordsRDD.count()


print shakeWordCount

882996


In [22]:
# TEST Remove empty elements (4e)
Test.assertEquals(shakeWordCount, 882996, 'incorrect value for shakeWordCount')

1 test passed.


#### ** Count the words **
####Use `takeOrdered()` to obtain the 200 most common words and their counts.

In [44]:
# TODO: Replace <FILL IN> with appropriate code

MostUsedWords = (shakeWordsRDD.map(lambda word: (word, 1)) 
             .reduceByKey(lambda a, b: a + b)
             .takeOrdered(100, lambda s: -s[1]))

print MostUsedWords
#top15WordsAndCounts = shakeWordsRDD.wordCount(shakeWordsRDD).takeOrdered(15, lambda s: -s[1])


#print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts)).takeOrdered(15, lambda s: -s[1])

[(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), (u'of', 17463), (u'a', 14593), (u'you', 13615), (u'my', 12481), (u'in', 10956), (u'that', 10890), (u'is', 9134), (u'not', 8497), (u'with', 7771), (u'me', 7769), (u'it', 7678), (u'for', 7558), (u'be', 6857), (u'his', 6857), (u'your', 6655), (u'this', 6602), (u'but', 6265), (u'he', 6251), (u'have', 5880), (u'as', 5733), (u'thou', 5485), (u'him', 5192), (u'so', 5044), (u'will', 4974), (u'what', 4464), (u'thy', 4032), (u'all', 3883), (u'her', 3843), (u'no', 3790), (u'do', 3748), (u'by', 3729), (u'shall', 3591), (u'if', 3493), (u'are', 3403), (u'we', 3296), (u'thee', 3178), (u'our', 3061), (u'lord', 3059), (u'on', 3046), (u'king', 2861), (u'good', 2812), (u'now', 2778), (u'sir', 2754), (u'from', 2639), (u'o', 2607), (u'come', 2507), (u'at', 2501), (u'they', 2471), (u'well', 2462), (u'or', 2425), (u'which', 2315), (u'would', 2293), (u'more', 2288), (u'was', 2229), (u'then', 2221), (u'she', 2208), (u'am', 2168), (u'how', 2159),

#### ** Count the words **
####Use `takeOrdered()` to obtain the 100 least used words and their counts.

In [45]:

LeastUsedWords = (shakeWordsRDD.map(lambda word: (word, 1)) 
             .reduceByKey(lambda a, b: a + b)
             .takeOrdered(100, lambda s: s[1]))

print LeastUsedWords


[(u'soforth', 1), (u'shardborne', 1), (u'joshua', 1), (u'unhousled', 1), (u'bere', 1), (u'consumptions', 1), (u'friendsa', 1), (u'gelidus', 1), (u'buttonhole', 1), (u'somever', 1), (u'deluded', 1), (u'tinctures', 1), (u'foreward', 1), (u'unloving', 1), (u'pacify', 1), (u'breakpromise', 1), (u'omitst', 1), (u'ladders', 1), (u'latchd', 1), (u'wilfull', 1), (u'abatements', 1), (u'esill', 1), (u'wanderers', 1), (u'oertrip', 1), (u'honorificabilitudinitatibus', 1), (u'flybitten', 1), (u'billets', 1), (u'impugns', 1), (u'sevnight', 1), (u'prisond', 1), (u'symbols', 1), (u'overripend', 1), (u'stinks', 1), (u'fatter', 1), (u'unattempted', 1), (u'truththe', 1), (u'whippingcheer', 1), (u'unhelpful', 1), (u'cockpit', 1), (u'thricedouble', 1), (u'cardnal', 1), (u'hewd', 1), (u'hollanders', 1), (u'arion', 1), (u'diverts', 1), (u'womand', 1), (u'paysans', 1), (u'languishd', 1), (u'suffocating', 1), (u'overworn', 1), (u'browse', 1), (u'sample', 1), (u'peize', 1), (u'discontenting', 1), (u'99', 1), (u

#### ** Count the words **
####Use `takeOrdered()` to obtain the twenty least used words and their counts.

In [None]:
#### ** Count the words **
####Use `takeOrdered()` to print the words in alphabetical order
####  Notice numbers appear in lexigraphical order

In [50]:

WordsInOrder= (shakeWordsRDD.map(lambda word: (word, 1)) 
             .reduceByKey(lambda a, b: a + b)
             .takeOrdered(500, lambda s: s[0]))

print WordsInOrder



[(u'1', 85), (u'10', 3), (u'100', 1), (u'101', 1), (u'102', 1), (u'103', 1), (u'104', 1), (u'105', 1), (u'106', 1), (u'107', 1), (u'108', 1), (u'109', 1), (u'11', 1), (u'110', 1), (u'111', 1), (u'112', 1), (u'113', 1), (u'114', 1), (u'115', 1), (u'116', 1), (u'117', 1), (u'118', 1), (u'119', 1), (u'12', 1), (u'120', 1), (u'121', 1), (u'122', 1), (u'123', 1), (u'124', 1), (u'125', 1), (u'126', 1), (u'127', 1), (u'128', 1), (u'129', 1), (u'13', 1), (u'130', 1), (u'131', 1), (u'132', 1), (u'133', 1), (u'134', 1), (u'135', 1), (u'136', 1), (u'137', 1), (u'138', 1), (u'139', 1), (u'14', 1), (u'140', 1), (u'141', 1), (u'142', 1), (u'143', 1), (u'144', 1), (u'145', 1), (u'146', 1), (u'147', 1), (u'148', 1), (u'149', 1), (u'15', 1), (u'150', 1), (u'151', 1), (u'152', 1), (u'153', 1), (u'154', 1), (u'1591', 2), (u'1592', 1), (u'1593', 2), (u'1594', 2), (u'1595', 3), (u'1596', 2), (u'1597', 2), (u'1598', 2), (u'1599', 3), (u'16', 1), (u'1601', 2), (u'1602', 2), (u'1603', 1), (u'1604', 1), (u'160