# MapReduce Patterns in Python

## Assignment 3

This assignment will introduce you to the MapReduce programming paradigm using Python and Apache Spark. 

In [None]:
# Global imports

from collections import defaultdict
import string
from operator import add
from functools import reduce

import pandas as pd

You will use the following text snippets in parts of your assignment. These values were generated from https://slipsum.com/.

In [None]:
text1 = """
    Well, the way they make shows is, they make one show. 
    That show's called a pilot. 
    Then they show that show to the people who make shows, 
    and on the strength of that one show they decide 
    if they're going to make more shows. Some pilots 
    get picked and become television programs. 
    Some don't, become nothing. 
    She starred in one of the ones that became nothing.
"""

text2 = """
    You think water moves fast? You should see ice. 
    It moves like it has a mind. 
    Like it knows it killed the world once and got a taste for murder. 
    After the avalanche, it took us a week to climb out. 
    Now, I don't know exactly when we turned on each other, 
    but I know that seven of us survived the slide... 
    and only five made it out. Now we took an oath, that I'm breaking now. 
    We said we'd say it was the snow that killed the other two, but it wasn't. 
    Nature is lethal but it doesn't hold a candle to man.
"""

text3 = """
    Now that we know who you are, I know who I am. I'm not a mistake! 
    It all makes sense! In a comic, you know how you can tell who the arch-villain's going to be? 
    He's the exact opposite of the hero. And most times they're friends, like you and me! 
    I should've known way back when... You know why, David? Because of the kids. 
    They called me Mr Glass.
"""

text4 = """
    Your bones don't break, mine do. That's clear. 
    Your cells react to bacteria and viruses differently than mine. 
    You don't get sick, I do. That's also clear. 
    But for some reason, you and I react the exact same way to water. 
    We swallow it too fast, we choke. We get some in our lungs, we drown. 
    However unreal it may seem, we are connected, you and I. 
    We're on the same curve, just on opposite ends.
"""

documents = [text1, text2, text3, text4]

### Assignment 3.1
 
In the first part of the assignment, you implement map and reduce functions that count the occurrences of words in a collection of documents. 

#### Assignment 3.1.a

In this part, you will implement a function that takes a document and outputs word/count tuple pairs. You will then apply that function across multiple documents using Python's built-in `map` function. 

The `map` function applies a function to every item in an input list of values. Technically, the input can be any Python iterable, but for the sake of simplicity, assume it is a list of values.  The primary reason to use the `map` function is that it is simple to parallelize its execution across multiple processes and computers. 

The following is a simple example of using Python's `multiprocessing` library to run a `map` function in five parallel processes. 

```python
from multiprocessing import Pool

def square_number(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        values = [1, 2, 3, 4, 5]
        print(p.map(square_number, values))
```

The above code prints the following result to standard output.

```
[1, 4, 9, 16, 25]
```

This is the same example that uses Python's built-in `map` function. This function does not take advantage of parallel execution. 

```python
def square_number(x):
    return x*x

numbers = [1, 2, 3, 4, 5]
results = map(square_number, numbers)
print(list(results))
```

A partially implemented version of the `word_count_map_function` is included below. Fill in the missing details. The `remove_punctuation` is provided as a way of removing punctuation from the input text. You can use this function in the implementation of the `word_count_map_function`. 

In [None]:
def remove_punctuation(text: str) -> str:
    """
    Simple function to remove punctuation from text

    :param text: text to remove punctuation from
    :return: text with punctuation removed
    """
    return ''.join([
        character
        for character in text
        if character not in string.punctuation
    ])

def word_count_map_function(text: str) -> list[tuple[str, int]]:
    """
    Simple map function that takes text and outputs tuples
    of words and counts
    
    :param text: text to convert into word/count tuples
    :return: A list of tuples with word and count
    """
    # TODO: Implement the word count map function

    # Step 1: Remove punctuation from text

    # Step 2: Convert text to lower case

    # Step 3: Split the text by spaces to convert into words

    # Step 4: Return a list containing a dictionary of words and counts
    
    # This line is here as a placeholder. Replace with your own code
    words = []

    # This line is here as a placeholder. Replace with your own code
    word_count_pairs = [('word1', 1), ('word2', 1)]
    return word_count_pairs

The following is example output of the function applied to `text1`
```python
>>> word_count_map_function(text1)

[('well', 1),
 ('the', 1),
 ('way', 1),
 ('they', 1),
 ('make', 1),
 ('shows', 1),
    ...
 ('ones', 1),
 ('that', 1),
 ('became', 1),
 ('nothing', 1)]
```

Note: The count for each of these examples is *1* since the mapper did not perform a reduce task (i.e. combine key/value pairs). In practice,  mappers may perform an internal reduce before sending along data to the global reduce task. 

#### Assignment 3.1.b

A MapReduce program usually contains multiple steps. Each *map* function runs in parallel and outputs a key/value pair. In the case of the word count task, the word is the key and the count is the value. The *reduce* function sorts each of the *map* outputs by the key (i.e. word) and then combine the results. The *reduce* phase is more computationally expensive as it involves sorting and combining data from multiple *map* functions. 

In this part, you will implement a function that combines the word/count pairs from the previous step. You will then apply that function across all of the *map* outputs using Python's *reduce* function. Previously, *reduce* was included in Python's built-in library, but now it is included in the *functools* library.  

The following example uses the *reduce* function to combine two lists.  

```python
list1 = [1, 2, 3, 4, 5]
list2 = [6, 7, 8, 9, 10]

lists = [list1, list2]

def merge_lists_reduce_function(x, y):
    return x + y

merged_values = reduce(merge_lists_reduce_function, lists)
merged_values
```
This produces the output:

```
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
```

We can use the *reduce* function a second time to add the numbers in the combined list. 

```
reduce(add, merged_values)
```
This returns the result:

```
55
```

A partially implemented version of the `word_count_reduce_function` is included below. Fill in the missing details. 

In [None]:
def word_count_reduce_function(key_value_pairs):
    result = dict()
    
    # TODO: Implement code to return list containing word/count pairs
    
    return result

The following code uses the functions defined previously to run the MapReduce job. The `counted_words_dict` variable should be a dictionary where the words are the keys and the total count for each word are the values. 

In [None]:
# Step 1: Apply the `word_count_map_function` to each the documents
map_output_values = map(word_count_map_function, documents)
# Step 2: Merge the outputs of the map function into a single list
merged_output_values = reduce(merge_lists_reduce_function, map_output_values)
# Step 3: Apply the `word_count_reduce_function` to create a single word/count dictionary
counted_words_dict = word_count_reduce_function(merged_output_values)
counted_words_dict

### Assignment 3.2

The second part of the assignment reproduces the word-count task using  [Apache Spark's resilient distributed dataset (RDD)](https://spark.apache.org/docs/latest/rdd-programming-guide.html). This abstraction allows MapReduce code to run in parallel in the memory of multiple computers.  

The following code initializes the Spark application. 

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("DSC 400 Assignment 3") \
    .getOrCreate()

spark_context = spark.sparkContext

The next code snippet loads the previously defined `documents` and creates a Spark RDD. 

In [None]:
documents_rdd = spark_context.parallelize(documents)

print("Number of Partitions: "+str(documents_rdd.getNumPartitions()))

Spark waits until there is output to execute this code. The `collect` method causes Spark to collect all the elements in the RDD. 

In [None]:
documents_rdd_collect = documents_rdd.collect()
print(documents_rdd_collect)

The `flatMap` method applies the `word_count_map_function` to each the documents. Unlike the `map` method, `flatMap` flattens the results into a single collection. Using `flatMap` means there is no need for a second step to merge the multiple results. 

In [None]:
map_output_rdd = documents_rdd.flatMap(word_count_map_function)
map_output_rdd.collect()

Finally, the `reduceByKey` method combines the results by the key (i.e. word). 

In [None]:
word_count_rdd = map_output_rdd.reduceByKey(add)
word_count_rdd.collect()

#### Asssignment 3.2.a

The `word_count_rdd` variable is an RDD that contains combined word/count pairs. Using this as a starting point, apply [filter](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.filter.html) to create a new RDD that contains entries with word counts greater than four. 

In [None]:
# TODO: Calculate `counts_gt_four_rdd` from `word_count_rdd`
counts_gt_four_rdd = None
counts_gt_four_rdd.collect()

#### Asssignment 3.2.b

Using [sortBy](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sortBy.html), create an RDD sorted by word count. The RDD should be sorted from greatest to least (i.e. descending values). 

In [None]:
# TODO: Calculate `sorted_word_count_rdd` from `word_count_rdd`
sorted_word_count_rdd = None
sorted_word_count_rdd.collect()

### Assignment 3.3

[Apache Spark's datasets and dataframes](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes) provides a programming interface that abstracts away the underlying details associated with MapReduce or RDDs. In this part of the assignment, you will use Spark's dataframes to perform a word count. You will perform additional operations such as filtering and sorting. 

The following code creates a [Spark dataframe](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html) from the previously defined `word_count_rdd`.  The `printSchema` method prints the schema for the dataframe.  

In [None]:
word_count_columns = ["word", "count"]
word_count_df = spark.createDataFrame(
    data=word_count_rdd, 
    schema=word_count_columns
)
word_count_df.printSchema()

The `show` method will print the dataframe as a table. 

In [None]:
word_count_df.show()

#### Assignment 3.3.a

Use the [filter](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.filter.html) method to create a new dataframe with word counts greater than four. 

In [None]:
# TODO: Create `word_count_filtered` from `word_count_df`

word_count_filtered = None
word_count_filtered.show()

#### Assignment 3.3.b

Use the [sort](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sort.html) method to create a new dataframe sorted by word count. Sort the word counts from greatest to least (i.e. descending).

In [None]:
# TODO: Create `word_count_sorted` from `word_count_df`

word_count_sorted = None
word_count_sorted.show()

#### Assignment 3.3.c

Use the [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html)
method to create a new dataframe that contains the top ten word count values. 

In [None]:
# TODO: Create `word_count_top_10` from `word_count_sorted`

word_count_top_10 = None
word_count_top_10.show()