# DS/CMPSC 410 Spring 2025
## Instructor: Professor John Yen
## TA: Jin Peng and Jingxi Zhu

## Lab 4 Data Integration, Change Detection, Spark-submit in Cluster 
## The goals of this lab are for you to be able to
## - Integrate multiple data sources using RDD-based join
## - Analyze temporal changes of Big Data
## - Apply the obove to compute hastag counts of Tweets after Boston Marathon Bombing (April 19 and April 20).
##  This lab includes four data sets, as explained below.
## Data (see "Running Spark in Cluster Mode_Salloc_S25.pdf" for instructions for copying data)
- The first dataset contains Boston Marathon Bombing collected on 4/19/2013.
- The second dataset contains Boston Marathon Bombing collected on 4/20/2013.
- To facilitate this analytics task both in the local mode and in the cluster mode, a smaller sampled dataset for each day's tweets has also been provided: sampled_4_19_tweets.csv and sampled_4_20_tweets.csv.  You should use these two data sets for running Spark in the local mode (using Jupyter Notebook).  However, you should change input data for spark-submit (cluster mode) to the big dataset for each day: BMB_4_19_tweets.csv and BMB_4_20_tweets.csv.
- You should create a Lab4 subdirectory under work, because more space is available under work directory. Download both datasets to your directory `<home>/work/Lab4/`.
- You should also download this Jupyter Notebook for Lab 4 to the same directory.
## Problem
- The problem we want to solve is to (1) find hashtags that in 4/19/2013 tweets and in 4/20/2013 tweets, and (2) calculate the difference of total hashtag counts of these hashtags (considering both days). You should save these hashtags together with their counts in a text file.
## Two Step
- You will first run in Spark local mode (and Jupyter Notebook) for the small sample for each day.
- After you obtain the result for small datasets, you can then convert the code for local mode into code for cluster mode, and submit the code to ICDS cluster and obtain run-time performance.  
1. Export the Jupyter Notebook as Lab4.py file, upload it from your local machine to your Lab4 directory in ICDS.
2. Rename the file to Lab4C.py, remove ``master="local"," from SparkContext.
3. Modify the Lab4C.py file so that it reads from BMB_4_19_tweets.csv and BMB_4_20_tweets.csv.
4. Modify Lab4C.py so that outputs are saved in directories different from those used in the local model
5. Follow the instructions in "Running Spark in Cluster Mode _Salloc_S25" to request resources, load Pyspark, and run pbs-spark-submit.

## Submit the following items for Lab 4
- Completed Jupyter Notebook of Lab 4 (50%)
- - Exercise 1: 5%
- - Exercise 2: 15%
- - Exercise 3: 10%
- - Exercise 4: 10%
- - Exercise 5: 10%
- Lab4C.py (used for spark-submit) (15%)
- Log file for Lab4C.py             (15%)
- A screen shot of "ls -al" in your Lab4 directory (10%)
- The fist and last output file of hashtag count difference generated by running spark-submit (cluster mode) on Lab4C.py. (10%)

## Total Points: 100 points

# Due: midnight, February 9th (Sunday).

In [1]:
import pyspark

In [2]:
from pyspark import SparkContext

## Because we are not using DataFrame in this lab, we will be creating SparkContext rather than SparkSession

- Note: We use "local" as the master parameter for ``SparkContext`` in this notebook so that we can run and debug it in ICDS Jupyter Server.  However, we need to remove ``"master="local",``later when you convert this notebook into a .py file for running it in the cluster mode.

In [3]:
sc=SparkContext(master="local", appName="Lab4 BMB hastag changes")
sc

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/06 18:07:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
sc.setLogLevel("WARN")

# Exercise 1 (5 points)  Add your name below 
## Answer for Exercise 1
- Your Name: Edwin Clatus

## Computing hastag count difference includes three steps:
- Step 1: Compute (and save) hashtag counts for April 19th tweets. 
- Step 2: Compute (and save) hashtag counts for April 20th tweets. 
- Step 3: Combine the hashtag counts of two days, compute their difference, sort based on the difference. Save the hashtag count difference. 

# Step 1 Compute (and save) hashtag counts for April 19th tweets.

## Complete the path and run the code below to read the file "sampled_4_19_BMBtweets.csv" from your Lab7 directory.

In [5]:
tweets_D1_RDD = sc.textFile("/storage/home/emc6390/work/Lab4/sampled_4_19_BMBtweets.csv")
tweets_D1_RDD

/storage/home/emc6390/work/Lab4/sampled_4_19_BMBtweets.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0


## Execute the code below, which computes the total count of hashtags in the input tweets, sort them by count (in descending order), and save them in an output directory:
- (a) Uses flatMap to "flatten" the list of tokens from each tweet (using split function) into a very large list of tokens.
- (b) Filter the token for hashtags.
- (c) Count the total number of hashtags in a way similar to Lab 2.
- (d) Sort the hashtag count in descending order.
- (e) Save the sorted hashtag counts in an output directory.

In [6]:
tokens_D1_RDD = tweets_D1_RDD.flatMap(lambda line: line.strip().split(" "))

In [7]:
hashtag_D1_RDD = tokens_D1_RDD.filter(lambda x: x.startswith("#"))

In [8]:
hashtag_1_D1_RDD = hashtag_D1_RDD.map(lambda x: (x, 1))

In [9]:
hashtag_count_D1_RDD = hashtag_1_D1_RDD.reduceByKey(lambda x, y: x+y , 6 )
hashtag_count_D1_RDD.persist()

PythonRDD[6] at RDD at PythonRDD.scala:53

In [10]:
sorted_hashtag_count_D1_RDD = hashtag_count_D1_RDD.sortBy(lambda pair: pair[1] , ascending=False)

                                                                                

### Note: You need to complete the path with your output directory. 
### Note: You also need to change the directory names (e.g., replace "_local.txt" with "_cluster.txt") in your .py file for running Spark in cluster mode.

In [11]:
output_path1 = "/storage/home/emc6390/work/Lab4/sorted_BMB_hashtag_count_4_19_local.txt" 
sorted_hashtag_count_D1_RDD.saveAsTextFile(output_path1)

                                                                                

# Step 2 Compute and save hashtag counts for April 20th tweets.

In [12]:
tweets_D2_RDD = sc.textFile("/storage/home/emc6390/work/Lab4/sampled_4_20_BMBtweets.csv")
tweets_D2_RDD

/storage/home/emc6390/work/Lab4/sampled_4_20_BMBtweets.csv MapPartitionsRDD[17] at textFile at NativeMethodAccessorImpl.java:0

## Exercise 2 (15%)
Complete the code below to compute the total count of hashtags in the input tweets, sort them by count (in descending order), and save them in an output directory:
- (a) Uses flatMap to "flatten" the list of tokens from each tweet (using split function) into a very large list of tokens.
- (b) Filter the token for hashtags.
- (c) Count the total number of hashtags in a way similar to Lab 2.
- (d) Sort the hashtag count in descending order.
- (e) Save the sorted hashtag counts in an output directory.

In [13]:
tokens_D2_RDD = tweets_D2_RDD.flatMap(lambda line: line.strip().split(" "))

In [14]:
hashtag_D2_RDD = tokens_D2_RDD.filter(lambda x: x.startswith("#"))

In [15]:
hashtag_2_D2_RDD = hashtag_D2_RDD.map(lambda x: (x, 1))

In [16]:
hashtag_count_D2_RDD = hashtag_2_D2_RDD.reduceByKey(lambda x, y: x+y , 6 )
hashtag_count_D2_RDD.persist()

PythonRDD[22] at RDD at PythonRDD.scala:53

In [17]:
sorted_hashtag_count_D2_RDD = hashtag_count_D2_RDD.sortBy(lambda pair: pair[1], ascending=False)

                                                                                

### Note: You need to complete the path with your output directory. 
### Note: You also need to change the output directory names (e.g., replace "_local" with "_cluster") before you convert this notebook into a .py file for submiting it to ICDS cluster.  

In [18]:
output_path2 = "/storage/home/emc6390/work/Lab4/sorted_BMB_hashtag_count_4_20_local.txt" 
sorted_hashtag_count_D2_RDD.saveAsTextFile(output_path2)

# Step 3 Combine the hashcount of two days, compute their difference, and save sorted difference of hashtag counts.

# Exercise 3 (10%) 
## Complete The code below to join the two hashtag-count key value pairs RDDs on their keys (i.e., hashtags), and  compute the difference of the counts between the two days (count of day 2 - count of day 1).

In [19]:
hashtag_count_D1_RDD.take(3)

[('#Bostonstrong', 53), ('#RT', 45), ('#prayforboston', 2243)]

In [20]:
hashtag_count_D2_RDD.take(3)

[('#gtfo', 3), ('#Manhunt', 320), ('#inners', 15)]

In [21]:
joined_hashtag_count_RDD = hashtag_count_D1_RDD.fullOuterJoin(hashtag_count_D2_RDD)

In [22]:
joined_hashtag_count_RDD.take(5)

[('#Bostonstrong', (53, 120)),
 ('#RT', (45, 97)),
 ('#prayforboston', (2243, 739)),
 ('#boston!', (17, 12)),
 ('#boston', (5873, 3010))]

# As we discussed in class, we need to convert missing values (None) into 0 before we calculate the difference of the count.

In [23]:
none1_RDD = joined_hashtag_count_RDD.filter(lambda pair: pair[1][0]==None )

In [24]:
none1_RDD.take(5)

[('#holytits', (None, 1)),
 ('#allmyprayers', (None, 1)),
 ('#NOCOMMERCIALS', (None, 1)),
 ('#fuckChechnya', (None, 1)),
 ('#ikidyounot', (None, 3))]

In [25]:
none1_RDD.count()

                                                                                

6487

In [26]:
none2_RDD= joined_hashtag_count_RDD.filter(lambda pair: pair[1][1]==None)

In [27]:
none2_RDD.take(5)

[('#PrayForFlorida', (1, None)),
 ('#Campaign', (1, None)),
 ('#candleLightVigil', (1, None)),
 ('#1..', (1, None)),
 ('#prayfortexas.', (2, None))]

In [28]:
none2_RDD.count()

                                                                                

8174

# The following function replaces a missing value (None) with zero.

In [29]:
def tran_none(x):
    if (x==None) :
        return(0)
    else:
        return(x)

# Exercise 4 (10%) 
## Complete the code below to transform missing values of hashtag counts into 0.

In [30]:
tran_joined_hashtag_count_RDD = joined_hashtag_count_RDD.map(lambda x: (x[0], (tran_none(x[1][0]), tran_none(x[1][1]))))

## Check whether the result does not contain any missing value.  If you still find any missing values in ``tran_joined_hashtag_count_RDD``, double check your answer for Exercise 4.

In [31]:
none3_RDD = tran_joined_hashtag_count_RDD.filter(lambda pair: pair[1][0]==None )

In [32]:
none3_RDD.count()

                                                                                

0

In [33]:
none4_RDD = tran_joined_hashtag_count_RDD.filter(lambda pair: pair[1][1]==None )

In [34]:
none4_RDD.count()

                                                                                

0

# Exercise 5 (10%)
Complete the code below to calculate the difference of hashtag counts between the two days (i.e., hashtag count of day 2 - hashtag count of day 1).

In [35]:
hashtag_count_diff_RDD = tran_joined_hashtag_count_RDD.map(lambda x: (x[0],x[1][1]-x[1][0]))

In [36]:
hashtag_count_diff_RDD.take(5)

[('#Bostonstrong', 67),
 ('#RT', 52),
 ('#prayforboston', -1504),
 ('#boston!', -5),
 ('#boston', -2863)]

In [37]:
sorted_hashtag_count_diff_RDD = hashtag_count_diff_RDD.sortBy(lambda pair: pair[1], ascending = False)

                                                                                

In [38]:
sorted_hashtag_count_diff_RDD.take(5)

                                                                                

[('#BostonStrong', 1771),
 ('#bostonstrong', 644),
 ('#USA', 466),
 ('#RedSox', 323),
 ('#tweetfromthebeat', 277)]

In [39]:
output_path3 = "/storage/home/emc6390/work/Lab4/sorted_BMB_hashtag_count_diff_local.txt" 
sorted_hashtag_count_diff_RDD.saveAsTextFile(output_path3)

In [40]:
sc.stop()