# Word Count

### Counting the number of occurances of words in a text is a popular first exercise using map-reduce.

## The Task
**Input:** A text file consisisting of words separated by spaces.  
**Output:** A list of words and their counts, sorted from the most to the least common.

We will use the book "Moby Dick" as our input.

In [6]:
# sc.stop()

In [7]:
#start the SparkContext
from pyspark import SparkContext
sc=SparkContext(master="local[4]")

22/10/10 09:36:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Setup a plan for pretty print

In [8]:
def pretty_print_plan(rdd):
    for x in rdd.toDebugString().decode().split('\n'):  # toDebugString() returns a byte string , decode() converts it to a string and split() splits the string into a list  when ever a new line (\n) is encountered
        print(x)

### Use `textFile()` to read the text

In [9]:
%%time
text_file = sc.textFile("Data/Moby-Dick.txt") # read the text file into a RDD 

CPU times: user 2.12 ms, sys: 550 µs, total: 2.67 ms
Wall time: 757 ms


In [10]:
type(text_file) 

pyspark.rdd.RDD

## Steps for counting the words

* split line by spaces.
* map `word` to `(word,1)`
* count the number of occurances of each word.

In [11]:
%%time
words =text_file.flatMap(lambda line: line.split(" ")) # flatMap() is flat version of map() , it returns a new RDD by first applying a function to all elements of this RDD, and then flattening the results. flattening result means that if the function returns a list of items, each item will be a separate element in the new RDD.

not_empty = words.filter(lambda x: x!='')  # filter() returns a new RDD containing only the elements that satisfy a predicate.  In this case we are filtering out all the empty strings from the RDD words 

key_values= not_empty.map(lambda word: (word, 1))  # map() returns a new RDD by applying a function to each element of this RDD. In this case we are creating a key value pair of each word and 1.  
# [('The', 1),
#  ('Project', 1),
#  ('Gutenberg', 1),
#  ('EBook', 1),
#  ('of', 1)]

key_values.collect()  # collect() returns all the elements of this RDD to the driver program.

counts=key_values.reduceByKey(lambda a, b: a + b) # reduceByKey() is a transformation that combines values with the same key using some function . In this case we are adding the values of the same key together to get the total count of each word. 

[Stage 0:>                                                          (0 + 2) / 2]

CPU times: user 59.2 ms, sys: 9.69 ms, total: 68.9 ms
Wall time: 1.7 s


                                                                                

In [13]:
key_values.take(10) # take() returns the first n elements of the RDD as a list.

[('The', 1),
 ('Project', 1),
 ('Gutenberg', 1),
 ('EBook', 1),
 ('of', 1),
 ('Moby', 1),
 ('Dick;', 1),
 ('or', 1),
 ('The', 1),
 ('Whale,', 1)]

In [15]:
counts.take(5) # take() returns the first n elements of the RDD as a list.

[('The', 549), ('Project', 79), ('EBook', 1), ('of', 6587), ('Moby', 79)]

### flatMap()
Note the line:
```python
words =     text_file.flatMap(lambda line: line.split(" "))
```
Why are we using `flatMap`, rather than `map`?

The reason is that the operation `line.split(" ")` generates a **list** of strings, so had we used `map` the result would be an RDD of lists of words. Not an RDD of words.

The difference between `map` and `flatMap` is that the second expects to get a list as the result from the map and it **concatenates** the lists to form the RDD.

## The execution plan
In the last cell we defined the execution plan, but we have not started to execute it.

* Preparing the plan took ~100ms, which is a non-trivial amount of time, 
* But much less than the time it will take to execute it.
* Lets have a look a the execution plan.

### Understanding the details
To see which step in the plan corresponds to which RDD we print out the execution plan for each of the RDDs.  

Note that the execution plan for `words`, `not_empty` and `key_values` are all the same.

In [16]:
pretty_print_plan(text_file)

(2) Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
 |  Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


In [17]:
pretty_print_plan(words)

(2) PythonRDD[11] at RDD at PythonRDD.scala:53 []
 |  Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
 |  Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


In [18]:
pretty_print_plan(not_empty)

(2) PythonRDD[12] at RDD at PythonRDD.scala:53 []
 |  Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
 |  Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


In [19]:
pretty_print_plan(key_values)

(2) PythonRDD[2] at collect at <timed exec>:7 []
 |  Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
 |  Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


In [20]:
pretty_print_plan(counts)

(2) PythonRDD[13] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[6] at mapPartitions at PythonRDD.scala:145 []
 |  ShuffledRDD[5] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[4] at reduceByKey at <timed exec>:9 []
    |  PythonRDD[3] at reduceByKey at <timed exec>:9 []
    |  Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
    |  Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


| Execution plan   | RDD |  Comments |
| :---------------------------------------------------------------- | :------------: | :--- |
|`(2)_PythonRDD[6] at RDD at PythonRDD.scala:48 []`| **counts** | Final RDD|
|`_/__MapPartitionsRDD[5] at mapPartitions at PythonRDD.scala:436 []`| **---"---** |
|`_/__ShuffledRDD[4] at partitionBy at NativeMethodAccessorImpl.java:0 [`| **---"---** | RDD is partitioned by key |
|`_+-(2)_PairwiseRDD[3] at reduceByKey at <timed exec>:4 []`| **---"---** | Perform mapByKey |
|`____/__PythonRDD[2] at reduceByKey at <timed exec>:4 []`| **words, not_empty, key_values** | The result of  partitioning into words|
| | |  removing empties, and making into (word,1) pairs|
|`____/__../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at Nat`| **text_file** | The partitioned text |
|`____/__../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMeth`| **---"---** | The text source |

## Execution
Finally we count the number of times each word has occured.
Now, finally, the Lazy execution model finally performs some actual work, which takes a significant amount of time.

In [21]:
%%time
## Run #1
Count=counts.count()  # Count = the number of different words
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y) # 
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 3.83 ms, sys: 11.4 ms, total: 15.3 ms
Wall time: 223 ms


### Amortization
When the same commands are performed repeatedly on the same data, the execution time tends to decrease in later executions.

The cells below are identical to the one above, with one exception at `Run #3`

Observe that `Run #2` take much less time that `Run #1`. Even though no `cache()` was explicitly requested. The reason is that Spark caches (or materializes) `key_values`, before executing `reduceByKey()` because performng reduceByKey requires a shuffle, and a shuffle requires that the input RDD is materialized. In other words, sometime caching happens even if the programmer did not ask for it.

In [24]:
%%time
## Run #2
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 0 ns, sys: 14 ms, total: 14 ms
Wall time: 210 ms


### Explicit Caching
In `Run #3` we explicitly ask for `counts` to be cached. This will reduce the execution time in the following run `Run #4` by a little bit, but not by much.

In [26]:
%%time
## Run #3, cache
Count=counts.cache().count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 11.7 ms, sys: 404 µs, total: 12.1 ms
Wall time: 174 ms


In [31]:
%%time
#Run #4
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 7.87 ms, sys: 2.04 ms, total: 9.91 ms
Wall time: 163 ms


In [30]:
%%time
#Run #5
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 693 µs, sys: 10.5 ms, total: 11.2 ms
Wall time: 164 ms
