# DS/CMPSC 410 Fall 2024
## Instructor: Professor John Yen
## 
# Lab 2: MapReduce in Spark

## The goals of this lab are for you to be able to
## - Implement MapReduce using map, flatMap, and reduceByKey in Spark
## - Apply the above for calculating total counts of words in a textfile.
## - Be able to use .take to show contents of an RDD.
## - Be able to generate an error message due to a typo in the input file.
## Total Number of Exercises: 
- Exercise 1: 5 points
- Exercise 2: 5 points
- Exercise 3: 5 points
- Exercise 4: 10 points
- Exercise 5: 10 points
- Exercise 6: 10 points
- Exercise 7: 5 points
## Total Points: 50 points

# Due: midnight, September 8, 2024

# The first thing we need to do in each Jupyter Notebook running pyspark is to import pyspark first.
# Note: You should have followed the instruction of Lab 2 (word document) to install PySpark in your ICDS account.  If not, complete all steps in the instructions before you proceed.

In [None]:
import pyspark

## Once we import pyspark, we need to import an important object called "SparkContext".  
- Note: Every spark program needs a SparkContext object, which provides critical information (e.g., whether this Spark session runs in local mode or cluster mode) for the run-time environment of the Spark session.

In [None]:
from pyspark import SparkContext

## We then create a Spark Context variable.  Once we have a spark context variable, we can execute spark codes. 
- The first parameter specifies that this Spark session is running in "local" mode. 
- The second parameter specifies a name for your Spark application.

### In creating the Spark Context variable below, we specified that this spark code is running in a 
`local`
### mode, with a name
`Lab2`.
### After you run the cell below, be patient to wait for its completion before you "run" the next cell. When the left of the cell shows 
`[*]:`, 
### it means the 'run' is not completed yet. The completion of running a cell is indicated by a number in the brackets such as
`[3]:`.
### Note: You MUST wait for a cell to complete before you 'run' the next cell in PySpark Jupyter notebook. Otherwise, the execution of your later cells may generate errors because the inputs they need are not available yet (e.g., been not been generated by a previous PySpark statement that is still running).

In [None]:
sc=SparkContext("local", "Lab2")
sc

### Note: You MUST wait for a cell to complete before you 'run" the next cell in PySpark Jupyter notebook. Otherwise, the execution of your later cells may generate errors because the inputs they need are not available yet (e.g., have not been generated by a previous PySpark statement that is still running).

In [None]:
sc.setLogLevel("WARN")

## Exercise 1 (5 points) (a) Add your name below AND (b) replace the path below with the path of your home directory (i.e., replace juy1 with your PSU Access ID).
## Answer for Exercise 1
- a: Your Name:

In [None]:
text_RDD = sc.textFile("/storage/home/???/work/Lab2/StayingSafeAtPennStateFall2024.txt")
text_RDD

# Part A: Parsing Input Text into Tokens

# RDD
- RDD is the primary distributed data structure used by Spark for storing/accessing big data in a cluster.  
- We will talk more about RDD next week. For now, we will view RDD as a big table.
- When we use `textFile` of Spark to read an input file, it returns an RDD (a big table), where each entry in the big table is a line of the input file.
- The contents of RDD can be obtained using `.take()` method.

# Take
- Take is an action, which we will discuss more next week.
- Applying `take` method on an RDD shows the content of the RDD.  
- The parameter of take is the number of entries in the RDD to be accessed.

# Exercise 2 (5%)
Complete the code below to show the first five lines of the input text.

In [None]:
text_RDD.take(??)

# Map
- Map of Spark applies a function to an input RDD. We often use a `lambda` expression to describe an unnamed function as the parameter for map. 
- The body of the lambda expression below uses Python `strip` and `split` method (for string).
- The Python `strip` method for strings removes spaces at the beginning and at the end of a string.
- The Python `split` method for strings split the string using the specified delimiter (which is space `" "` in this case).
- Returns an `RDD` (a "big table") where each row is the result of applying the map function to a corresponding row of the input RDD.

In [None]:
line_tokens_RDD = text_RDD.map(lambda line: line.strip().split(" "))
line_tokens_RDD

# Exercise 3 (5%)
Complete the code below to show the first ten entries of the line_tokens_RDD.

In [None]:
line_tokens_RDD.???(???)

## Observations:
- Tokens/words in each entry in the text_RDD (i.e., each line in the input text file) are splitted/separated by the space character.
- The `lambda` function returns a list of tokens for each line of the text_RDD.

# flatMap 
The Spark `flatMap` method for RDD returns an RDD that removes the boundary between entries/rows of the input RDD big table.
- Applying `flatMap` to `line_tokens_RDD` removes the boundary of different lines in the input text.  In another word, it merges the list of tokens for each lines in the text into a gigantic list of tokens for the entire input document.
- Intuively, the effect of `flatMap` can be understood as **flattening** the internal structures of its input RDD.

## Results of flatMap:
We no longer see the list of tokens in `line_tokens_RDD` the reflects the line structure of the input file.  Instead, each token is an element of this gigantic list: `tokens_RDD`, which we will use to calculate the total number of occurance for each word/token.

In [None]:
tokens_RDD = line_tokens_RDD.flatMap(lambda x: x)
tokens_RDD.take(10)

## Exercise 4 (10%)
- (a) Describe, based on the cotents of tokens_RDD shown by the previous statement, the effect of flatMap. (5%)
- (b) What is the key difference between the output of map and flatmap ?(5%)

## Answers to Exercise 4:


# Part B: Counting Word Frequency using MapReduce

### We want to count the total number of time a word/token occurs in the twitter dataset. We can use the concept of MapReduce to do this in a "scalable" way such that we can do this calculation even if the size of twitter dataset is too large to fit into a computer.

### A MapReduce way to achieve this involves two steps:
### Step 1: map each word into a key value pair 
`(<word>, 1)`
### The key of this key-value pair is the word (in the input RDD); the value of the key-value pair is the number 1.
### Step 2: Use reduceByKey in Spark to aggregate all pairs of the same key into
`(<word>, <count>)`.

# Python Lambda function
The lambda function that is the parameter of `map` is applied to each element of the input RDD (which is each token in the tokens_RDD). Lambda function (in Python) has the format of
`lambda <parameter list>: <function body>`
The value returned by the lambda function is either 
- an explicit `return` statement, or
- the value returned by the last statement in the body of the function.

In the example below, the lambda function returns the key value pair `(<input word>, 1)`
because it is the only (hence also the last) statement in the body of the lambda
expression.


In [None]:
token_1_RDD = tokens_RDD.map(lambda x: (x, 1))
token_1_RDD

In [None]:
token_1_RDD.take(10)

# reduceByKey

The transformation `reduceByKey` also takes a lambda function. However, this function takes TWO parameters.
- The first parameter `lambda x,y: x+y` is an aggregation function, which has two parameters (x and y in the code below).
- The first parameter of reduceByKey x is a counter (initial value is 0).
- The second parameter of reduceByKey y is the 'value' in input key-value pairs to be aggregated with the counter.  
- Since the 'values' in the input key-value pairs are all 1, this lambda function simplies increment the accumulator for each occurance of the key.  Because reduceByKey aggregates across the entire input RDD, the final value (i.e., accumulator) for each word/token is the total number it occurs in the Twitter dataset.
- The second parameter is the number of partitions used to partition the keys (so that the reduce task can be distributed among multiple reduce workers for scalability). In the cluster mode, the choice of this number should include considerations about the total number of nodes available in the cluster

# Exercise 5 (10 points) Complete the code below by choosing a name for the RDD to be generated by `reduceByKey`.

In [None]:
??? = token_1_RDD.reduceByKey(lambda x,y: x+y)
???.take(??)

# Exercise 6 (10 points) Complete the code below by filling in the path of your Tweets word count output.

# saveAsTextFile
This action saves the content of the input RDD in the directory specified.  Even though we add ".txt" extension to the directory (to remind ourself it contains text file, not CSV),
the path is used as a directory to store all partitions of the given RDD (i.e., `token_count_RDD` in this lab). 

## An important requirement for saveAsTextFile is the path for the output files SHOULD NOT EXIST.  

## Debugging Tip #1: 
Before you run saveAsTextFile each time, double check whether the output path used does not already exist.  If it does, either 
- (a) remove the directory, or
- (b) change the output path to one that has not been used.

In [None]:
output_file = "/storage/home/???/work/Lab2/Lab2_output.txt"
WF_RDD.saveAsTextFile(output_file)

# Exercise 7 (5 points) Modify the path of input file to create a typo in the INPUT FILE NAME.  Execute the code below. 
- You do not need to fix the typo.
- (a) When do you expect to see the first error message before running the code below?
- (b) When does the error message actually show up?

In [None]:
text2_RDD = sc.textFile("???")
text2_RDD

In [None]:
line2_RDD = text2_RDD.map(lambda line: line.strip().split(" "))

In [None]:
token2_RDD = line2_RDD.flatMap(lambda x: x)

In [None]:
token2_RDD

In [None]:
token2_1_RDD = token2_RDD.map(lambda x: (x, 1))

In [None]:
token2_count_RDD = token2_1_RDD.reduceByKey(lambda x,y: x+y)

# Answer to Exercise 7 (10 points):
### Type your answer to Exercise 7 below:
- (a) 
- (b)

In [None]:
sc.stop()

## The `.stop()` method terminates the given SparkContext. 