## DS/CMPSC 410 Sparing 2021
## Instructor: Professor John Yen
## TA: Rupesh Prajapati
## LAs: Cayla Shan Pun and Lily Jakielaszek
## Lab 2: MapReduce in Spark, Tips for Debugging Spark

## The goals of this lab are for you to be able to
## - Implement MapReduce using map and reduceByKey in Spark
## - Apply the above for calculating total counts of words in a set of tweets

## Total Number of Exercises: 
- Exercise 1: 5 points
- Exercise 2: 10 points
- Exercise 3: 10 points
- Exercise 4: 10 points
## Total Points: 35 points

# Due: midnight, January 23, 2022

## The first thing we need to do in each Jupyter Notebook running pyspark is to import pyspark first.

In [1]:
import pyspark

## Once we import pyspark, we need to import an important object called "SparkContext".  Every spark program needs a SparkContext object

In [2]:
from pyspark import SparkContext

## We then create a Spark Context variable.  Once we have a spark context variable, we can execute spark codes.

In [3]:
sc=SparkContext("local", "Lab2")
sc

### We then create a Spark Context variable.  Once we have a spark context variable, we can execute Spark codes. 
### In creating the Spark Context variable, we also specifies that this spark code is running in a 
`local`
### mode, with a name
`Lab2`.
### After you run the cell below, be patient to wait for its completion before you "run" the next cell. When the left of the cell shows 
`[*]:`, 
### it means the 'run' is not completed yet. The completion of running a cell is indicated by a number in the brackets such as
`[3]:`.
### Note: You MUST wait for a cell to complete before you 'run" the next cell in PySpark Jupyter notebook. Otherwise, the run of your later cells may generate an unnecessary error because the input RDD it needs has not been generated by a previous PySpark statement.

## Exercise 1 (5 points) (a) Add your name below AND (b) replace the path below with the path of your home directory (i.e., replace juy1 with your PSU Access ID).
## Answer for Exercise 1
- a: Your Name: Haichen Wei

In [4]:
text_RDD = sc.textFile("/storage/home/hxw5245/Lab2/TweetsClimateChangeSentiment(1).csv")
text_RDD

/storage/home/hxw5245/Lab2/TweetsClimateChangeSentiment(1).csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

## Debugging Tips: 
- Because Spark uses a "smart plan-ahead scheme" (called "lazy evaluation", which we will elaborate next week) like that of MapReduce, the reading input file `textFile` above IS NOT EXECUTED until an action (e.g., saveAsTextFile) is executed. The result of executing `textFile` is the creation of an RDD, which we will elaborate next week.  For now, think of RDD as a "data structure/template" without real data. This RDD is assigned to the variable `text_RDD`.
- Therefore, any run-time bug introduced in executing reading input file `textFile` is NOT detected after evaluating the pyspark statement. Rather, it is detected when an action is involved.
- This can create an obstacle for debugging; because there can be many possible statements that may cause an error after an action is executed.
- **Debugging Tip #1**: When you see an error message after an action is executed.  Read the error message carefully to identify potential causes of the error message (which can be steps before the step that generate the error message).

# Map
Map of Spark takes a function as a parameter. We often use a lambda expression to describe an unnamed function as the parameter for map. The body of the following Spark map statement uses Python splits method (for string) to split each element of the input (which is a string representing a line obrtained from reading the textfile using `textFile`) using space `"  "` as the delimiter. 

In [5]:
line_RDD = text_RDD.map(lambda line: line.strip().split(" "))
line_RDD

PythonRDD[2] at RDD at PythonRDD.scala:53

# Take
- Take is an action (hence invokes lazy evaluation) method for an RDD. The parameter of take specifies the number of elements that will be sampled and displayed from the input RDD.
- **Debugging Tip #2**: You can add `.take(n)` to an RDD to see the content of an RDD for debugging purpose.  This is useful to check whether the content of RDD is what you expected.  If not, the incorrect content of RDD can reveal the next debugging steps (e.g., check the content of earlier RDD's used to generate this RDD).

In [6]:
line_RDD.take(2)

[['Text,Support'],
 ['RT',
  '@kasserolees:',
  'Energy',
  'is',
  'the',
  '#1',
  'contributer',
  'to',
  'climate',
  'change',
  'by',
  'far.',
  'Also',
  'ALL',
  'agriculture',
  '(not',
  'just',
  'animal',
  'ag)',
  'contributes',
  'to',
  'climate',
  'ch,1']]

# flatMap 
- The Spark `flatMap` method for RDD returns an RDD that no longer has the boundary between elements of the input RDD. For example, the line_RDD generated by the previous code contains lists of tokens, where each list corresponds to each line of the input.  The first list contains one string that is the headers of the TweetsClimateChangeSentiment.csv file. The headings "Text" and "Support" are not separated by space; hence they are not separated into two tokens by`split(" ")`. The second list contains the first tweets ( `RT @kasserolees; Energy is the #1 contributer to climate change by far. Also ALL agriculture (not just anmial ag) contributes to climate ch,1` ).  
- The number "1" at the end of the line is actually the sentiment support of the tweet, not part of the original tweet.  A better processing pipeline should not mix 1 with the content of the tweet.  We will address this issue later in the class.  
- Applying flatMap to `line_RDD` removes the list structure introduced by different lines of the input file. As a result, all tokens from all lines/tweets are in a huge list.  In another word, the internal structure of the line_RDD is **flattened**. 

In [7]:
token_RDD = line_RDD.flatMap(lambda x: x)

In [8]:
token_RDD.take(4)

['Text,Support', 'RT', '@kasserolees:', 'Energy']

### Results of flatMap:
We no longer see the list of tokens in line_RDD the reflects the line structure of input file.  In stead, each token is an element of this very large list: `token_RDD`, in which a word/token can occur multiple times.

### We want to count the total number of time a word/token occurs in the twitter dataset. We can use the concept of MapReduce to do this in a "scalable" way such that we can do this calculation even if the size of twitter dataset is too large to fit into a computer.

### A MapReduce way to achieve this involves two steps:
### Step 1: map each word into a key value pair 
`(<word>, 1)`
### The key of this key-value pair is a word; the value of the key-value pair is a number "1".
### Step 2: Use reduceByKey in Spark to aggregate all pairs of the same key into
`(<word>, <count>)`.
### where <count> is the result of aggregating all of the 1's associated with the same key/word.

In [9]:
token_1_RDD = token_RDD.map(lambda x: (x, 1))

## The function of reduceByKey takes two parameters, 
- (1) The first parameter `lambda x,y: x+y` is the aggregation function used by this `reduceByKey`. The lambda expression/function has two parameters (x and y in the code below): x is an accumulator (with the default initial value of 0), y is the 'value' in input key-value pairs to be aggregated with the accumulator.  
- (2) The second parameter `4` is the number of partitions used to partition the keys (so that reduce task can be distributed to 4 reduce workers for scalability). One of the design decisions is to determine how many partitions to  use.  We will talk more about this in future modules.

# Exercise 2 (10 points) Fill in the second parameter of reduceByKey.

In [10]:
token_count_RDD = token_1_RDD.reduceByKey(lambda x,y: x+y, 4)

In [11]:
token_count_RDD.take(10)

[('far.', 1),
 ('ag)', 1),
 ('contributes', 1),
 ('@msnbc', 3),
 ('why', 26),
 ('have', 136),
 ('air?,1', 2),
 ('@anthonyfurey', 1),
 ('data.', 1),
 ('them', 17)]

# Exercise 3 (10 points) Complete the code below by filling your home directory.

In [12]:
output_file = "/storage/home/hxw5245/Lab2/Lab2_token_count.txt"
token_count_RDD.saveAsTextFile(output_file)

# Exercise 3 (10 points) Modify the path of input file to create a typo in the file or path. Execute the code below. Apply the debugging tips above to interpret the debug message when it is generated. 
- (a) What bug causes the error message? (5 points)
- (b) What information in the error message is useful to help you uncover the bug?  (5 points)
### Fill in your answer in the Markdown cell below labelled "Answer to Exercise 3".

In [13]:
text2_RDD = sc.textFile("/storage/home/hxw5245/Lab2/TweetsClimateChangeSentimentfjhmj.csv")
text2_RDD

/storage/home/hxw5245/Lab2/TweetsClimateChangeSentimentfjhmj.csv MapPartitionsRDD[14] at textFile at NativeMethodAccessorImpl.java:0

In [14]:
line2_RDD = text2_RDD.map(lambda line: line.strip().split(" "))
line2_RDD

PythonRDD[15] at RDD at PythonRDD.scala:53

In [15]:
token2_RDD = line2_RDD.flatMap(lambda x: x)

In [16]:
token2_1_RDD = token2_RDD.map(lambda x: (x, 1))

In [17]:
token2_count_RDD = token2_1_RDD.reduceByKey(lambda x,y: x+y, 4)

In [19]:
# token2_count_RDD.take(10)

In [20]:
output2_file = "/storage/home/hxw5245/Lab2/Lab2_token_count2.txt"
token2_count_RDD.saveAsTextFile(output2_file)

Py4JJavaError: An error occurred while calling o132.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:106)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1090)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1088)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1008)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1007)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:964)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1578)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1578)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1564)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:551)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/storage/home/hxw5245/Lab2/TweetsClimateChangeSentimentfjhmj.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.api.python.PairwiseRDD.getPartitions(PythonRDD.scala:112)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:101)
	at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
	at org.apache.spark.rdd.RDD.$anonfun$dependencies$2(RDD.scala:264)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.dependencies(RDD.scala:260)
	at org.apache.spark.scheduler.DAGScheduler.getShuffleDependenciesAndResourceProfiles(DAGScheduler.scala:661)
	at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:586)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1145)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2541)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:83)
	... 51 more
Caused by: java.io.IOException: Input path does not exist: file:/storage/home/hxw5245/Lab2/TweetsClimateChangeSentimentfjhmj.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.api.python.PairwiseRDD.getPartitions(PythonRDD.scala:112)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:101)
	at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
	at org.apache.spark.rdd.RDD.$anonfun$dependencies$2(RDD.scala:264)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.dependencies(RDD.scala:260)
	at org.apache.spark.scheduler.DAGScheduler.getShuffleDependenciesAndResourceProfiles(DAGScheduler.scala:661)
	at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:586)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1145)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2541)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)


# Answer to Exercise 3 (10 points):
- (a) The path of input file is incorrect. 
- (b) Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/storage/home/hxw5245/Lab2/TweetsClimateChangeSentimentfjhmj.csv

# Exercise 4 (10 points) 
- (1) Describe how many output files containing token counts are generated in your output directory. (5 points)
- (2) Explain which code determines the number of output files. (5 points)
### Provide your answer in the Markdown cell below.

# Answer to Exercise 4:
- (1) There are 4 files containing token counts in the output directory.  
- (2) The following code determines the number of output files:token_count_RDD = token_1_RDD.reduceByKey(lambda x,y: x+y, 4). The second parameter 4 means it uses 4 partitions, so it create 4 files. 

In [21]:
sc.stop()

## The `.stop()` method terminates a SparkContext. 