<a href="https://colab.research.google.com/github/gmelaku/Assignment1/blob/master/5_Word_Count.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Word Count

Counting the number of occurances of words in a text is a popular first exercise using map-reduce.

## The Task
**Input:** A text file consisisting of words separated by spaces.  
**Output:** A list of words and their counts, sorted from the most to the least common.

We will use the book "Moby Dick" as our input.

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz

# !tar -xvf /content/spark-2.4.5-bin-hadoop2.7.tgz
!tar -xvf  spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

import findspark
findspark.init()

# Create a blank SparkContext
from pyspark import SparkContext
sc = SparkContext()

In [0]:
def pretty_print_plan(rdd):
    for x in rdd.toDebugString().decode().split('\n'):
        print(x)

## Download data file from S3

In [4]:
%%time

##If this cell fails, download the file from https://mas-dse-open.s3.amazonaws.com/Moby-Dick.txt on your browser
# and put it in the '../../Data/ directory
import requests
data_dir='../../Data'
filename='Moby-Dick.txt'
url = "https://mas-dse-open.s3.amazonaws.com/"+filename
local_path = data_dir+'/'+filename
!mkdir -p {data_dir}
# Copy URL content to local_path
r = requests.get(url, allow_redirects=True)
open(local_path, 'wb').write(r.content)

# check that the text file is where we expect it to be
!ls -l $local_path

-rw-r--r-- 1 root root 1257260 Feb 14 00:50 ../../Data/Moby-Dick.txt
CPU times: user 96.1 ms, sys: 29.8 ms, total: 126 ms
Wall time: 5.82 s


## Define an RDD that will read the file
* Execution of read is **lazy**
* File has been opened.
* Reading starts when stage is executed.

In [5]:
%%time
text_file = sc.textFile(data_dir+'/'+filename)
type(text_file) 

CPU times: user 69 µs, sys: 2.83 ms, total: 2.9 ms
Wall time: 972 ms


## Steps for counting the words

* split line by spaces.
* map `word` to `(word,1)`
* count the number of occurances of each word.

In [6]:
%%time
words =     text_file.flatMap(lambda line: line.split(" "))
not_empty = words.filter(lambda x: x!='') 
key_values= not_empty.map(lambda word: (word, 1)) 
counts=     key_values.reduceByKey(lambda a, b: a + b)

CPU times: user 16.1 ms, sys: 3.41 ms, total: 19.5 ms
Wall time: 151 ms


### flatMap()
Note the line:
```python
words =     text_file.flatMap(lambda line: line.split(" "))
```
Why are we using `flatMap`, rather than `map`?

The reason is that the operation `line.split(" ")` generates a **list** of strings, so had we used `map` the result would be an RDD of lists of words. Not an RDD of words.

The difference between `map` and `flatMap` is that the second expects to get a list as the result from the map and it **concatenates** the lists to form the RDD.

## The execution plan
In the last cell we defined the execution plan, but we have not started to execute it.

* Preparing the plan took ~100ms, which is a non-trivial amount of time, 
* But much less than the time it will take to execute it.
* Lets have a look a the execution plan.

### Understanding the details
To see which step in the plan corresponds to which RDD we print out the execution plan for each of the RDDs.  

Note that the execution plan for `words`, `not_empty` and `key_values` are all the same.

In [7]:
pretty_print_plan(text_file)

(2) ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
 |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


In [8]:
pretty_print_plan(words)

(2) PythonRDD[6] at RDD at PythonRDD.scala:53 []
 |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
 |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


In [9]:
pretty_print_plan(not_empty)

(2) PythonRDD[7] at RDD at PythonRDD.scala:53 []
 |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
 |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


In [10]:
pretty_print_plan(key_values)

(2) PythonRDD[8] at RDD at PythonRDD.scala:53 []
 |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
 |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


In [11]:
pretty_print_plan(counts)

(2) PythonRDD[9] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[5] at mapPartitions at PythonRDD.scala:133 []
 |  ShuffledRDD[4] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[3] at reduceByKey at <timed exec>:4 []
    |  PythonRDD[2] at reduceByKey at <timed exec>:4 []
    |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
    |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []



| Execution plan   | RDD |  Comments |
| :---------------------------------------------------------------- | :------------: | :--- |
|`(2)_PythonRDD[6] at RDD at PythonRDD.scala:48 []`| **counts** | Final RDD|
|`_/__MapPartitionsRDD[5] at mapPartitions at PythonRDD.scala:436 []`| **---"---** |
|`_/__ShuffledRDD[4] at partitionBy at NativeMethodAccessorImpl.java:0 [`| **---"---** | RDD is partitioned by key |
|`_+-(2)_PairwiseRDD[3] at reduceByKey at <timed exec>:4 []`| **---"---** | Perform mapByKey |
|`____/__PythonRDD[2] at reduceByKey at <timed exec>:4 []`| **words, not_empty, key_values** | The result of  partitioning into words|
| | |  removing empties, and making into (word,1) pairs|
|`____/__../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at Nat`| **text_file** | The partitioned text |
|`____/__../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMeth`| **---"---** | The text source |

## Execution
Finally we count the number of times each word has occured.
Now, finally, the Lazy execution model finally performs some actual work, which takes a significant amount of time.

In [12]:
%%time
## Run #1
Count=counts.count()  # Count = the number of different words
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y) # 
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 17.4 ms, sys: 5.63 ms, total: 23 ms
Wall time: 2.79 s


### Amortization
When the same commands are performed repeatedly on the same data, the execution time tends to decrease in later executions.

The cells below are identical to the one above, with one exception at `Run #3`

Observe that `Run #2` take much less time that `Run #1`. Even though no `cache()` was explicitly requested. The reason is that Spark caches (or materializes) `key_values`, before executing `reduceByKey()` because performng reduceByKey requires a shuffle, and a shuffle requires that the input RDD is materialized. In other words, sometime caching happens even if the programmer did not ask for it.

In [13]:
%%time
## Run #2
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 15.5 ms, sys: 3.08 ms, total: 18.6 ms
Wall time: 314 ms


#### Explicit Caching
In `Run #3` we explicitly ask for `counts` to be cached. This will reduce the execution time in the following run by a little bit, but not by much.

In [14]:
%%time
## Run #3, cache
Count=counts.cache().count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 14.8 ms, sys: 1.32 ms, total: 16.1 ms
Wall time: 400 ms


In [0]:
%%time
#Run #4
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 20 ms, sys: 10 ms, total: 30 ms
Wall time: 432 ms


In [0]:
%%time
#Run #5
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 307 ms


## Summary
This was our first real pyspark program, hurray!

#### Some things you learned:

1) An RDD is a distributed immutable array.  
It is the core data structure of Spark is an RDD. 

2)  You cannot operate on an RDD directly. Only through **Transformations** and **Actions**.

3) **Transformations** transform an RDD into another RDD.

4) **Actions** output their results on the head node.

5) After the action is done, you are using just the head node, not the workers.

#### Lazy Execution

1) RDD operations are added to an **Execution Plan**.

2) The plan is executed when a result is needed.

3) Explicit and implicit caching cause internediate results to be saved.

**Next:** Finding the most common words.

# Finding the most common words
* `counts`: RDD with 33301 pairs of the form `(word,count)`. 
* Find the 5 most frequent words. 
* **Method1:** `collect` and sort on head node.
* **Method2:** Pure Spark, `collect` only at the end.

## **Method1:** `collect` and sort on head node 
### Collect the RDD into the driver node
* Collect can take significant time.

In [15]:
%%time
C=counts.collect()

CPU times: user 12.7 ms, sys: 9.98 ms, total: 22.7 ms
Wall time: 103 ms


### Sort 
* RDD collected into list in driver node.
* No longer using spark parallelism.
* Sort in python
* will not scale to very large documents.

In [16]:
%%time
C.sort(key=lambda x:x[1])
print('most common words\n'+'\n'.join(['%s:\t%d'%c for c in reversed(C[-5:])]))

most common words
the:	13766
of:	6587
and:	5951
a:	4533
to:	4510
CPU times: user 9.29 ms, sys: 48 µs, total: 9.33 ms
Wall time: 11.2 ms


### Compute the mean number of occurances per word.

In [0]:
Count2=len(C)
Sum2=sum([i for w,i in C])
print('count2=%f, sum2=%f, mean2=%f'%(Count2,Sum2,float(Sum2)/Count2))


count2=33781.000000, sum2=215133.000000, mean2=6.368462


## **Method2:** Pure Spark, `collect` only at the end.
* Collect into the head node only the more frquent words.
* Requires multiple **stages**

### Step 1 split, clean and map to `(word,1)`

In [0]:
%%time
word_pairs=text_file.flatMap(lambda x: x.split(' '))\
    .filter(lambda x: x!='')\
    .map(lambda word: (word,1))

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 66.3 µs


### **Step 2** Count occurances of each word.

In [0]:
%%time
counts=word_pairs.reduceByKey(lambda x,y:x+y)

CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 37.5 ms


### **Step 3** Reverse `(word,count)` to `(count,word)` and sort by key

In [0]:
%%time
reverse_counts=counts.map(lambda x:(x[1],x[0]))   # reverse order of word and count
sorted_counts=reverse_counts.sortByKey(ascending=False)

CPU times: user 30 ms, sys: 10 ms, total: 40 ms
Wall time: 1.49 s


### Full execution plan

We now have a complete plan to compute the most common words in the text. Nothing has been executed yet! Not even a single byte has been read from the file `Moby-Dick.txt` !

For more on execution plans and lineage see [jace Klaskowski's blog](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-lineage.html#toDebugString)

In [0]:
print('word_pairs:')
pretty_print_plan(word_pairs)
print('\ncounts:')
pretty_print_plan(counts)
print('\nreverse_counts:')
pretty_print_plan(reverse_counts)
print('\nsorted_counts:')
pretty_print_plan(sorted_counts)

word_pairs:
(2) PythonRDD[18] at RDD at PythonRDD.scala:48 []
 |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
 |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []

counts:
(2) PythonRDD[23] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:122 []
 |  ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[20] at reduceByKey at <timed exec>:1 []
    |  PythonRDD[19] at reduceByKey at <timed exec>:1 []
    |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
    |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []

reverse_counts:
(2) PythonRDD[30] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:122 []
 |  ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[20] at 

sorted_counts:

| Execution plan   | RDD |
| :---------------------------------------------------------------- | :------------: |
|`(2)_PythonRDD[20] at RDD at PythonRDD.scala:48 []`| **sorted_counts** |
|`_/__MapPartitionsRDD[19] at mapPartitions at PythonRDD.scala:436 []`| **---"---** |
|`_/__ShuffledRDD[18] at partitionBy at NativeMethodAccessorImpl.java:0 `| **---"---** |
|`_+-(2)_PairwiseRDD[17] at sortByKey at <timed exec>:2 []`| **---"---** |
|`____/__PythonRDD[16] at sortByKey at <timed exec>:2 []`| ** counts, reverse_counts** |
|`____/__MapPartitionsRDD[13] at mapPartitions at PythonRDD.scala:436 []`| **---"---** |
|`____/__ShuffledRDD[12] at partitionBy at NativeMethodAccessorImpl.java`| **---"---** |
|`____+-(2)_PairwiseRDD[11] at reduceByKey at <timed exec>:1 []`| **---"---** |
|`_______/__PythonRDD[10] at reduceByKey at <timed exec>:1 []`| **word_pairs** |
|`_______/__../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at `| **---"---** |
|`_______/__../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeM`| **---"---** |

### **Step 4** Take the top 5 words

In [0]:
%%time
D=sorted_counts.take(5)
print('most common words\n'+'\n'.join(['%d:\t%s'%c for c in D]))

most common words
13766:	the
6587:	of
5951:	and
4533:	a
4510:	to
CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 398 ms


## Summary
We showed two ways for finding the most common words:
1. Collecting and sorting at the head node. -- Does not scale.
2. Using RDDs to the end.

See you next time!