# Word Count

### Counting the number of occurances of words in a text is a popular first exercise using map-reduce.

## The Task
**Input:** A text file consisisting of words separated by spaces.  
**Output:** A list of words and their counts, sorted from the most to the least common.

We will use the book "Moby Dick" as our input.

In [1]:
#start the SparkContext
from pyspark import SparkContext
sc=SparkContext(master="local[4]")

### Setup a plan for pretty print

In [2]:
def pretty_print_plan(rdd):
    for x in rdd.toDebugString().decode().split('\n'):
        print(x)

### Use `textFile()` to read the text

In [8]:
%%time
text_file = sc.textFile("Data/Moby-Dick.txt")

CPU times: user 4.86 ms, sys: 161 µs, total: 5.02 ms
Wall time: 162 ms


In [9]:
type(text_file)

pyspark.rdd.RDD

## Steps for counting the words

* split line by spaces.
* map `word` to `(word,1)`
* count the number of occurances of each word.

In [11]:
%%time
words =     text_file.flatMap(lambda line: line.split(" "))
not_empty = words.filter(lambda x: x!='') 
key_values= not_empty.map(lambda word: (word, 1)) 
counts=     key_values.reduceByKey(lambda a, b: a + b)

CPU times: user 22.8 ms, sys: 7.14 ms, total: 29.9 ms
Wall time: 207 ms


### flatMap()
Note the line:
```python
words =     text_file.flatMap(lambda line: line.split(" "))
```
Why are we using `flatMap`, rather than `map`?

The reason is that the operation `line.split(" ")` generates a **list** of strings, so had we used `map` the result would be an RDD of lists of words. Not an RDD of words.

The difference between `map` and `flatMap` is that the second expects to get a list as the result from the map and it **concatenates** the lists to form the RDD.

## The execution plan
In the last cell we defined the execution plan, but we have not started to execute it.

* Preparing the plan took ~100ms, which is a non-trivial amount of time, 
* But much less than the time it will take to execute it.
* Lets have a look a the execution plan.

### Understanding the details
To see which step in the plan corresponds to which RDD we print out the execution plan for each of the RDDs.  

Note that the execution plan for `words`, `not_empty` and `key_values` are all the same.

In [12]:
pretty_print_plan(text_file)

(2) Data/Moby-Dick.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0 []
 |  Data/Moby-Dick.txt HadoopRDD[2] at textFile at NativeMethodAccessorImpl.java:0 []


In [13]:
pretty_print_plan(words)

(2) PythonRDD[8] at RDD at PythonRDD.scala:49 []
 |  Data/Moby-Dick.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0 []
 |  Data/Moby-Dick.txt HadoopRDD[2] at textFile at NativeMethodAccessorImpl.java:0 []


In [14]:
pretty_print_plan(not_empty)

(2) PythonRDD[9] at RDD at PythonRDD.scala:49 []
 |  Data/Moby-Dick.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0 []
 |  Data/Moby-Dick.txt HadoopRDD[2] at textFile at NativeMethodAccessorImpl.java:0 []


In [15]:
pretty_print_plan(key_values)

(2) PythonRDD[10] at RDD at PythonRDD.scala:49 []
 |  Data/Moby-Dick.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0 []
 |  Data/Moby-Dick.txt HadoopRDD[2] at textFile at NativeMethodAccessorImpl.java:0 []


In [16]:
pretty_print_plan(counts)

(2) PythonRDD[11] at RDD at PythonRDD.scala:49 []
 |  MapPartitionsRDD[7] at mapPartitions at PythonRDD.scala:129 []
 |  ShuffledRDD[6] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[5] at reduceByKey at <timed exec>:4 []
    |  PythonRDD[4] at reduceByKey at <timed exec>:4 []
    |  Data/Moby-Dick.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0 []
    |  Data/Moby-Dick.txt HadoopRDD[2] at textFile at NativeMethodAccessorImpl.java:0 []


| Execution plan   | RDD |  Comments |
| :---------------------------------------------------------------- | :------------: | :--- |
|`(2)_PythonRDD[6] at RDD at PythonRDD.scala:48 []`| **counts** | Final RDD|
|`_/__MapPartitionsRDD[5] at mapPartitions at PythonRDD.scala:436 []`| **---"---** |
|`_/__ShuffledRDD[4] at partitionBy at NativeMethodAccessorImpl.java:0 [`| **---"---** | RDD is partitioned by key |
|`_+-(2)_PairwiseRDD[3] at reduceByKey at <timed exec>:4 []`| **---"---** | Perform mapByKey |
|`____/__PythonRDD[2] at reduceByKey at <timed exec>:4 []`| **words, not_empty, key_values** | The result of  partitioning into words|
| | |  removing empties, and making into (word,1) pairs|
|`____/__../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at Nat`| **text_file** | The partitioned text |
|`____/__../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMeth`| **---"---** | The text source |

## Execution
Finally we count the number of times each word has occured.
Now, finally, the Lazy execution model finally performs some actual work, which takes a significant amount of time.

In [17]:
%%time
## Run #1
Count=counts.count()  # Count = the number of different words
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y) # 
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 14 ms, sys: 16.1 ms, total: 30.1 ms
Wall time: 4.02 s


### Amortization
When the same commands are performed repeatedly on the same data, the execution time tends to decrease in later executions.

The cells below are identical to the one above, with one exception at `Run #3`

Observe that `Run #2` take much less time that `Run #1`. Even though no `cache()` was explicitly requested. The reason is that Spark caches (or materializes) `key_values`, before executing `reduceByKey()` because performng reduceByKey requires a shuffle, and a shuffle requires that the input RDD is materialized. In other words, sometime caching happens even if the programmer did not ask for it.

In [18]:
%%time
## Run #2
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 31.2 ms, sys: 3.91 ms, total: 35.1 ms
Wall time: 752 ms


### Explicit Caching
In `Run #3` we explicitly ask for `counts` to be cached. This will reduce the execution time in the following run `Run #4` by a little bit, but not by much.

In [19]:
%%time
## Run #3, cache
Count=counts.cache().count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 14.9 ms, sys: 13.9 ms, total: 28.8 ms
Wall time: 789 ms


In [20]:
%%time
#Run #4
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 32.9 ms, sys: 25.1 ms, total: 58 ms
Wall time: 508 ms


In [21]:
%%time
#Run #5
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 14.5 ms, sys: 11 ms, total: 25.5 ms
Wall time: 457 ms
