# Word Count with MapReduce - Lab

## Introduction

Now that we have seen the key map and reduce operators in Spark, and also know when to use transformation and action operators, we can revisit the word count problem we introduced earlier in the section. In this lab, we will read a text corpus into the Spark environment, perform a word count, and try basic NLP ideas to get a good grip on how MapReduce performs. 

## Objectives

In this lab you will:

- Apply the map(func) transformation to a given function on all elements of an RDD in different partitions 
- Apply a map transformation for all elements of an RDD 
- Compare the difference between a transformation and an action within RDDs 
- Use collect(), count(), and take() actions to trigger spark transformations 
- Use filter to select data that meets certain specifications within an RDD 
- Use Spark and the MapReduce framework to complete a full parallelized word count problem 

## MapReduce task

Here is what our problem looks like:

* We have a huge text document
* We need to count the number of times each distinct word appears in the document


* Sample applications:

    * Analyze web server logs to find popular URLs
    * Analyze texts for content or style 

## Word Count

We will illustrate a MapReduce computation for counting the number of occurrences for each word in a text corpus. In this example, the input file is a repository of documents and each document is an element. We shall count the frequency of stop words for __style identification__ as stop words might have unique features which can potentially describe the author's writing style based on their use of stop words while writing. We shall look at some texts by Shakespeare and Jane Austin following this motivation. 

MapReduce in PySpark provides a practical and efficient way of achieving this goal as it: 

* works if the file is too large for memory

* works even if the output is too large for memory

* is naturally parallelizable


### MapReduce Framework

Here are the steps that we will perform for our problem, under the MapReduce framework:  

* Sequentially read a lot of data (text files in this case)


* Map:
    * Extract something you care about


* Group by key: Sort and shuffle


* Reduce:
    * Aggregate, summarize, filter or transform


* Write the result 

As a reminder, here is what it looks like visually, given the example we used before:
![](./images/word_count.png)

### Initialize SparkContext()

- First, import the `pyspark` module into this Python environment and initialize a `SparkContext()` 
- Initialize a local spark context

In [1]:
# Start a local SparkContext
import pyspark
pyspark
sc = pyspark.SparkContext('local[*]') # [*] represents a local context i.e. no cluster


To test our code, start with a single text file, `'hamlet.txt'`. First, set a file path variable `file` to the location of `'text/hamlet.txt'`. 

In [2]:
# Set a path variable for data 
file = 'text/hamlet.txt'
file

# 'text/hamlet.txt'

'text/hamlet.txt'

## Read and Split text file contents into RDD - `sc.textFile()`

Previously we used parallelization to read an RDD from a Python list. Here we shall read the text file into Spark RDDs by using `sc.textFile()` method for loading the text file into the `lines` RDD. The documentation on RDDs can be found [here!!](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

The `textFile(path)` method reads a text file from the HDFS/local file system/any Hadoop supported file system, into the number of partitions specified and returns it as an RDD of strings. In order to view the contents of the RDD, we will use the `RDD.collect()` method since calling the RDD by name will not return the contents, only the object type and relevant information. 

In [3]:
# Read the text file into an RDD using sc.textFile()
lines = sc.textFile(file)
lines

# text/hamlet.txt MapPartitionsRDD[36] at textFile at NativeMethodAccessorImpl.java:0

text/hamlet.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

The text file has been written in a "line-by-line" manner into the RDD. We can access any given entry using simple indexing. 

- Print a few sample lines from the `lines` RDD

In [4]:
print(lines.collect()[500])
print(lines.collect()[1000])

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException: Unsupported class file major version 55
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:237)
	at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
	at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
	at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


Similarly, we can also print the whole document, lines by line. 

- Print complete Hamlet from the lines RDD

In [5]:
# Print the text, line-by-line
# This will output the whole of hamlet text, one line at a time. 

for line in lines.collect():
    print(line)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException: Unsupported class file major version 55
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:237)
	at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
	at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
	at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


Great, now that the complete text file is in a `lines` RDD, we can easily use the map function to break it down further into individual words and parallelize it accordingly. 

__Note: Parallelization is handled by the Spark environment according to available infrastructure and doesn't need any further configuration__.

## The MAP function `map(func)`

The Map function for this example uses keys that are of type string (the words) and values that are integers. The Map task reads a document and breaks it into its sequence of words `w1, w2, . . . , wn`. It then makes a sequence of key-value pairs for each word where the word itself is the key and the value is always 1. That is, the output of the Map task for this document is the sequence of key-value pairs as shown below:

> `(w1, 1), (w2, 1), . . . ,(wn, 1)`

This step performs the following two sub-steps:

* A splitting step that takes the input dataset from the source and divides it into smaller subsets 
* A mapping step that takes those smaller subsets and performs an action or computation on each subset 


### Spark Mapping functions

Previously, we saw that:

- `map(func)` returns a new distributed dataset formed by passing each element of the source through a function `func`.

- `flatMap(func)` maps each input item to 0 or more output items (so `func` should return a seq rather than a single item).

`flatMap()` breaks the output of a lambda function into individual RDD elements (as opposed to map).

---

* Use `RDD.flatMap()` to split the lines by whitespace and collect into one flat RDD.

* The transformation is defined in the lambda expression, where the input `x` is defined as producing the result of `x.split(' ')`.

* Use the `RDD.take(n)` method to pick `n` words from the top of the sequence. 

* Use `flatMap()` to break the output of the lambda function into individual RDD elements. 


In [6]:
# split the lines into words based on blanks ' ' and show ten elements from the top 

words = lines.flatMap(lambda x: x.split(' '))
words.take(10)

# ['', '1604', '', '', 'THE', 'TRAGEDY', 'OF', 'HAMLET,', 'PRINCE', 'OF']

['', '1604', '', '', 'THE', 'TRAGEDY', 'OF', 'HAMLET,', 'PRINCE', 'OF']


### Create a Tuple as (k,v)

- Map each word to a tuple of (word, 1).

Map doesn't break up the output of the lambda expression, meaning that the tuples stay intact.

In [7]:
# Use a lambda function with map to add a 1 to each word and output a tuple
# (word, 1) - Take ten elements

tuples = words.map(lambda x: (x, 1))
tuples.take(10)

[('', 1),
 ('1604', 1),
 ('', 1),
 ('', 1),
 ('THE', 1),
 ('TRAGEDY', 1),
 ('OF', 1),
 ('HAMLET,', 1),
 ('PRINCE', 1),
 ('OF', 1)]

### Change the words to lower case to ensure integrity

As we can see from the output above, the text contains words in capital as well as lower case. By default, 'THE' and 'the' would be considered two separate words due to case sensitivity. 

- Modify the map function above to change all the words to lowercase using a `.lower()` inside the lambda function.


In [8]:
# Change the words in words tuples to lowercase - take 10 elements 

tuplesLCase = words.map(lambda x: (x.lower(), 1))
tuplesLCase.take(10)

[('', 1),
 ('1604', 1),
 ('', 1),
 ('', 1),
 ('the', 1),
 ('tragedy', 1),
 ('of', 1),
 ('hamlet,', 1),
 ('prince', 1),
 ('of', 1)]

## REDUCE Function
The Reduce function’s argument is a pair consisting of a key and its list of associated values as the pairs created above. The output of the Reduce function is a sequence of zero or more key-value pairs. These key-value pairs can be of a type different from those sent from Map tasks to Reduce tasks, but often they are the same type.

We will refer to the application of the Reduce function to a single key and its associated list of values as a reducer.

![](reduce.png)

- Use `RDD.reduceByKey()` to add up all the words. the new (k, v) pairs would have the word as a key and the number of occurrences as a value. 

Here, the lambda has two arguments (x and y) that are added.

In [9]:
# USe reduceByKey with tuplesLCase to add all values under same keys - take 10

wordCount = tuplesLCase.reduceByKey(lambda x,y: x+y)
wordCount.take(10)

[('', 20383),
 ('1604', 1),
 ('tragedy', 1),
 ('of', 670),
 ('prince', 2),
 ('denmark', 10),
 ('shakespeare', 1),
 ('dramatis', 1),
 ('claudius,', 2),
 ('king', 43)]

### Filter rare words

Following the standard NLP approach, we can add a filtering step to remove all words which appear less than some threshold value, say, with less than 5 occurrences. 

This can be useful to identify common topics between documents, where very rare words can be misleading. 
For this step, we can use `RDD.filter(func)` where `func` is a lambda function that filters out any word which appears less than or equal to 5 times. You may also use a separate function to achieve this. 

- Remove rare words with occurences < 5 using lambda function inside a `.filter()` method 

In [10]:
# Remove all rare words with frequency less than 5 - take 10 

freqWords = wordCount.filter(lambda x:  x[1] >= 5 )
freqWords.take(10)

[('', 20383),
 ('of', 670),
 ('denmark', 10),
 ('king', 43),
 ('son', 11),
 ('polonius,', 6),
 ('horatio,', 15),
 ('hamlet.', 25),
 ('courtier.', 7),
 ('rosencrantz,', 6)]

### List  of stop words

Add a filtering step to retain only words included in a list of stop words. 

Stop words can be useful for recognizing the style of an author. Removing stop words can be useful in recognizing the topic of a document. For stop word removal, we use `RDD.filter(func)` again with a lambda function that uses a list of stop words to extract the key-value pairs for only the words that are present in the stop word list. Use a simple list like the one shown below: 

> ['', 'the','a','in','of','on','at','for','by','I','you','me'] 


- Use the stop word list above to count the occurrences of these words in the document
- Show the stop word frequency

In [11]:
# show stop word frequency in the output

stopWordList = ['', 'the','a','in','of','on','at','for','by','i','you','me'] 
stopWords = freqWords.filter(lambda x:  x[0] in stopWordList) 
stopWords.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException: Unsupported class file major version 55
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:237)
	at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
	at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
	at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


### List of keep words

- Modify the filter operation above to keep all the words found in the text **except** the stop words. 

In [12]:
# Modify above filter to show top ten keep words by frequency

keepWords = freqWords.filter(lambda x:  x[0] not in stopWordList) 
keepWords.take(10)

output = keepWords.takeOrdered(10, key=lambda x: -x[1])
output

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException: Unsupported class file major version 55
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:237)
	at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
	at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
	at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


### Putting it all together 

Combine the above code as a function and pass three works of Shakespeare (`'romeandjuliet.txt'`, `'hamlet.txt'`, `'othello.txt'`). Observe the frequency of stop words. Repeat the same exercise for three works of Jane Austin (`'senseandsensibility.txt'`, `'prideandprejudice.txt'` and `'emma.txt'`). 

> Can you recognize the writing styles of these authors based on their use of stop words?
> What can you do to improve the style recognition ability?

In [13]:
# Create a function for word count that takes in a file name and stop wordlist to perform above tasks
def wordCount(filename, stopWordlist):
    output = sc.textFile(filename)
    words1 = lines.flatMap(lambda x: x.split(' '))
    words2 = words1.map(lambda x: (x.lower(), 1))
    wordCount = words2.reduceByKey(lambda x,y: x+y)
    freqWords = wordCount.filter(lambda x:  x[1] >= 5 )
    stopWords = freqWords.filter(lambda x:  x[0] in stopWordList) 
    output = stopWords.collect()
    
    return output

## Level Up (Optional)

* Create histograms of the top 50 words from each author


## Summary 

In this simple exercise, we saw MapReduce in action for solving a basic NLP task, i.e. counting the frequency of stop words and keep words of a text corpus. This exercise can be seen as a first step towards text analytics on big data platforms. After this lab, we will get into more advanced use cases of PySpark, specifically for machine learning applications. 