## Word Count

Counting the number of occurances of words in a text is a popular first exercise using map-reduce.

## The Task

Input: A text file consisting of words seperated by spaces.
Output: A list of words and their counts, sorted from the most to the least common.

We will use the book "Moby Dick" as our input.

In [1]:
#start the SparkContext
from pyspark import SparkContext, SparkConf


In [2]:
sc = SparkContext(master="local[4]")

### Setup a plan for pretty print

In [3]:
def pretty_print_plan(rdd):
    for x in rdd.toDebugString().decode().split('\n'):
        print(x)

### Use textFile() to read the text

In [14]:
%%time
text_file = sc.textFile("file:///home/erin/Downloads/spark-3.0.1-bin-hadoop2.7/Moby-Dick.txt")

CPU times: user 1.12 ms, sys: 506 µs, total: 1.63 ms
Wall time: 22.5 ms


In [15]:
type(text_file)

pyspark.rdd.RDD

### Steps for counting the words
1. split line by spaces.
2. map word to (word,1)
3. count the number of occurances of each word.

In [16]:
%%time
words =     text_file.flatMap(lambda line: line.split(" "))
not_empty = words.filter(lambda x: x!='') 
key_values= not_empty.map(lambda word: (word, 1)) 
counts=     key_values.reduceByKey(lambda a, b: a + b)

CPU times: user 5.69 ms, sys: 9.02 ms, total: 14.7 ms
Wall time: 88.2 ms


## flatMap()

Note the line:

words =     text_file.flatMap(lambda line: line.split(" "))

The reason we are using flatMap is that operation line.split(" ") generates a list of strings, so had we used map the result would be an RDD of lists of words. Not an RDD of words. The difference between map and flatMap is that the second expects to get a list as the result from the map and  it concatenates the lists to form the RDD.

## The execution plan

In the last call we defined the execution plan, but we have not started to execute it.

1. Preparing the plan took ~100ms, which is non-trivial amount of time,
2. But much less than the time it will take to execute it.
3. Lets have a look at the execution plan

### Understanding the details

To see which step in the plan corresponds to which RDD we print out the execution plan for each of the RDDS.
Note that the execution plan for words, not_empty and key_values are all the same

In [17]:
pretty_print_plan(text_file)

(2) file:///home/erin/Downloads/spark-3.0.1-bin-hadoop2.7/Moby-Dick.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:0 []
 |  file:///home/erin/Downloads/spark-3.0.1-bin-hadoop2.7/Moby-Dick.txt HadoopRDD[6] at textFile at NativeMethodAccessorImpl.java:0 []


In [18]:
pretty_print_plan(words)

(2) PythonRDD[12] at RDD at PythonRDD.scala:53 []
 |  file:///home/erin/Downloads/spark-3.0.1-bin-hadoop2.7/Moby-Dick.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:0 []
 |  file:///home/erin/Downloads/spark-3.0.1-bin-hadoop2.7/Moby-Dick.txt HadoopRDD[6] at textFile at NativeMethodAccessorImpl.java:0 []


In [19]:
pretty_print_plan(not_empty)

(2) PythonRDD[13] at RDD at PythonRDD.scala:53 []
 |  file:///home/erin/Downloads/spark-3.0.1-bin-hadoop2.7/Moby-Dick.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:0 []
 |  file:///home/erin/Downloads/spark-3.0.1-bin-hadoop2.7/Moby-Dick.txt HadoopRDD[6] at textFile at NativeMethodAccessorImpl.java:0 []


In [20]:
pretty_print_plan(key_values)

(2) PythonRDD[14] at RDD at PythonRDD.scala:53 []
 |  file:///home/erin/Downloads/spark-3.0.1-bin-hadoop2.7/Moby-Dick.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:0 []
 |  file:///home/erin/Downloads/spark-3.0.1-bin-hadoop2.7/Moby-Dick.txt HadoopRDD[6] at textFile at NativeMethodAccessorImpl.java:0 []


In [21]:
pretty_print_plan(counts)

(2) PythonRDD[15] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[11] at mapPartitions at PythonRDD.scala:133 []
 |  ShuffledRDD[10] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[9] at reduceByKey at <timed exec>:4 []
    |  PythonRDD[8] at reduceByKey at <timed exec>:4 []
    |  file:///home/erin/Downloads/spark-3.0.1-bin-hadoop2.7/Moby-Dick.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:0 []
    |  file:///home/erin/Downloads/spark-3.0.1-bin-hadoop2.7/Moby-Dick.txt HadoopRDD[6] at textFile at NativeMethodAccessorImpl.java:0 []


## Execution
Finally we count the number of times each word has occured. Now, finally, the Lazy execution model finally performs some actual work, which takes a significant amount of time.

In [22]:
%%time
## Run #1
Count=counts.count()  # Count = the number of different words
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y) # 
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 6.73 ms, sys: 11.3 ms, total: 18 ms
Wall time: 2.12 s


## Amortization

When the same commands are performed repeatedly on the same data, the execution time tends to decrease in later executions.

The cells below are identical to the one above, with one exception at Run #3

Observe that Run #2 take much less time than Run #1 Even though no cache() was explicitly requested. The reason is that Spark caches (or materializes) key_values, before executing reduceByKey() because performing reduceByKey requires a shuffle and shuffle requires that the input RDD materialzed. In other words, Sometime caching happens even if the programmer did not asked for it.


In [23]:
%%time
##Run #2
Count = counts.count()
Sum = counts.map(lambda x:x[1]).reduce(lambda x,y: x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 11.7 ms, sys: 4.19 ms, total: 15.9 ms
Wall time: 249 ms


## Explicit Caching

In Run #3 we explicitly ask for counts to be cached. This will reduce the execution time in the following run Run #4 by a little bit, but not by much.

In [25]:
%%time
## Run #3, cache
Count=counts.cache().count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 7.25 ms, sys: 8.28 ms, total: 15.5 ms
Wall time: 185 ms


In [26]:
%%time
#Run #4
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 5.73 ms, sys: 7.31 ms, total: 13 ms
Wall time: 161 ms


In [27]:
%%time
#Run #5
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 8.45 ms, sys: 7.19 ms, total: 15.6 ms
Wall time: 182 ms
