# MGTA 466: Analytics Assignment 4 - Word Count on Amazon EMR

---

#### Tasks:

- Work with **BookReviews_1M** dataset to set up the word count exercise as you did in PA2. 
    - This is only to get you started on AWS and you won't need to report anything on this dataset except the run-time of your code.
- Find the top 50 words containing the letter 'z' (lowercase) and their counts based on word count for the **BookReviews_4M** dataset
- Calculate average and standard deviation of execution times over 3 runs for these three settings:
    1. BookReviews_1M - 1 master + 1 worker node 
    2. BookReviews_4M - 1 master + 1 worker node
    3. BookReviews_4M - 1 master + 3 worker nodes
##### **NOTE** - The worker nodes are also called core nodes when initializing them on AWS

#### Submission on Gradescope:
You need to submit the following three files under "PA4". Instructions to generate the csv files are given in their respective sections
* The current notebook with outputs using **BookReviews_4M.json** dataset - **PA4_Starter.ipynb**
IMPORTANT: Make sure that all expected outputs are present and the output of each cell matches the expected output. **Do not display your outputs under new cells**
* csv file containing the 50 most frequently occurring words containing the letter 'z' and their counts for the **BookReviews_4M** dataset + column header - **50_words.csv** (Instructions given below)
* csv file containing execution times for the below dataset and EMR cluster configuration - **exec_times.csv** (Instructions given below)
    1. BookReviews_1M - 1 master + 1 worker node 
    2. BookReviews_4M - 1 master + 1 worker node
    3. BookReviews_4M - 1 master + 3 worker nodes
* Screenshot of terminated clusters, cluster creation time, cluster elapsed time, and your username.
  
#### IMPORTANT submission guidelines enforced by autograder. Please read carefully:
  * Make sure that all the cells in this notebook are executed before submission
  * Some cells are marked **DO NOT DELETE**. These cells cannot be deleted and the output of these cells will be used for autograding
  * You can add cells or delete(NOT recommended) other cells, but the **Expected Output** for each of the tasks MUST be the output of the cells marked as such
  * DO NOT print anything other than the *exact* expected output. Do not include any sentences describing the output. This is strictly enforced by the autograder which checks for an *exact* match of the expected output. For example, if you are expected to print the PySpark version:
      * '10.9.8' - <span style="color:#093">CORRECT</span>
      * 'The PySpark version is 10.9.8' - <span style="color:#FF0000">INCORRECT</span>
  * You can add cells for printing debugging information anywhere, but do not print anything else in **Expected Output** cells other than the expected output for the task
  * For any task that expects `n` rows of a dataframe, **ALWAYS pass `truncate=False` as a parameter to `DataFrame.show` function**
  * **NOTE - Any points deducted for not following submission guidelines will NOT be considered for regrade requests**

---

Remember: when in doubt, read the documentation first. It's always helpful to search for the class that you're trying to work with, e.g. pyspark.sql.DataFrame.

PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html


### 1. Upload the 1M dataset to S3

To make the datasets available to the EMR cluster, we need to upload the data files to Amazon S3. Follow these steps to do so:

1. In the Amazon console, open the **Services** menu on the top left and select **S3**
2. Create a bucket if you don't have one yet. Use the default settings, but your bucket name must be unique. 
3. Create a folder in your bucket, e.g. `data`, using the default settings. (Don't upload the data file to the root of the bucket; we'll also use this bucket for later assignments, so it's good to keep everything organized.)
4. Enter the folder and upload the **.txt** file. Do NOT upload the zip, as Spark won't know what to do with it. 

---

You can use this dataset now.


This exercise is only to help you understand how you can create your own S3 buckets and read data from it. The actual task for you is to read data (BookReviews_4M.json) from a different S3 bucket and work on that dataset.

### 2. Setting up the EMR cluster and creating a PySpark notebook

We have already uploaded the 4M reviews data to the s3 bucket `s3://mgta466-w25` under the `data` folder. Follow the steps in the Session 4 Demo to create an EMR cluster followed by a workspace for you to work on this notebook. 
After launching the workspace (that is attached to your EMR cluster) in JupyterLab:

1. Upload this notebook(**PA4_Starter.ipynb**) in the workspace. **IMPORTANT** Click on *kernel* on the top right and select **PySpark**
2. The `BookReviews_4M.json` data is at `s3://mgta466-w25/data/BookReviews_4M.json`. In the following sections, use this URI for data file path

### 3. Start Spark Session and Read Data - 2 points

Note that you don't need to manually start the spark session. AWS does it for you in the background, so that the spark session is started as soon as you import pyspark. The spark session is automatically available in the global variable `spark`

Remember that the kernel for running this Notebook is **PySpark** and not Python 3.

#### **Expected Output** - Spark version

In [1]:
# DO NOT DELETE THIS CELL
# YOUR CODE HERE
import pyspark
print(pyspark.__version__)

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1741757123775_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'3.5.4+amzn.0'

In [2]:
# Record the starting time of execution for timing this notebook

import time
start_time = time.time()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
#REPLACE WITH YOUR S3 PATH FOR BookReviews_1M.txt WHEN REQUIRED
dataFileName = "s3://mgta466-w25/data/BookReviews_4M.json"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Read data from S3. The data is present in the S3 path provided above
# YOUR CODE HERE
df = spark.read.json(dataFileName)
print(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[asin: string, image: array<string>, overall: double, reviewText: string, reviewTime: string, reviewerID: string, reviewerName: string, style: struct<Color::string,Format::string,Package Quantity::string,Style::string>, summary: string, unixReviewTime: bigint, verified: boolean, vote: string]

#### **Expected Output** - Number of rows of the dataframe

In [5]:
# DO NOT DELETE THIS CELL
# YOUR CODE HERE
rowCount = df.count()

# Print the number of rows
print(rowCount)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4027011

### 4. Examine the data - 1 point

Your task: Examine the contents of the dataframe that you've just read from file.
##### Note: For step 2 below keep only the reviewText column in the DF and rename reviewText to value

Expected output: 

1. Schema of the raw dataframe (with all columns)
2. First 25 rows of the dataframe showing only whole sentence of reviewText with column renamed to 'value'. Use the `truncate` parameter of `DataFrame.show` to display whole sentences without truncation

**Note**: If you're attempting to run the notebook with "BookReviews_1M.txt", please note that it lacks the "reviewText" column. Consequently, you'll need to make appropriate changes to Step 2 and then display the first 25 rows of the DataFrame.

#### **Expected output** - Schema of the raw dataframe (with all columns)

In [6]:
# DO NOT DELETE THIS CELL
# YOUR CODE HERE
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- asin: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- style: struct (nullable = true)
 |    |-- Color:: string (nullable = true)
 |    |-- Format:: string (nullable = true)
 |    |-- Package Quantity:: string (nullable = true)
 |    |-- Style:: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- vote: string (nullable = true)

#### **Expected output** - First 25 rows of the dataframe showing whole sentence under a column named `value`
**REMINDER**: Always pass truncate=FALSE to dataframe show() methods

In [7]:
# DO NOT DELETE THIS CELL
# YOUR CODE HERE
from pyspark.sql.functions import col

# Assuming your DataFrame is already loaded and is named 'df'
# Rename 'reviewText' column to 'value'
df = df.withColumnRenamed("reviewText", "value")

# Show the first 25 rows of the DataFrame with 'value' column, without truncating text
df.select("value").show(25, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                            

### 5. Clean the data - 2 point

Your task: Remove all punctuations and convert all characters to lower case.

Expected output: The first 25 rows of a dataframe, with a column containing the cleaned sentences. 

In [8]:
# Do not change this cell. 

# NOTE: Counterintuitively, column objects do NOT store any data; instead they store column expressions (transformations). 
#       The below function takes in a column object, and adds more expressions to it to make a more complex transformation. 
#       Once we have a column object representing the expressions we want, use DataFrame.select(column) to apply the expressions

from pyspark.sql.functions import regexp_replace, trim, col, lower
def removePunctuation(column):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces."""
    return trim(lower(regexp_replace(column, "[^A-Za-z0-9 ]", ""))).alias("sentence")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# Recommended: take a look at the contents of a column object returned from removePunctuations. What's in there? 
# No answers or outputs required for this cell. 
print(removePunctuation(df.value))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Column<'trim(lower(regexp_replace(value, [^A-Za-z0-9 ], , 1))) AS sentence'>

#### **Expected output** - The first 25 rows of the cleaned dataframe, with a column containing the **entire** cleaned sentences, under a column named `sentence`

In [10]:
# DO NOT DELETE THIS CELL
# execute the column expressions generated by removePunctuation() to clean the sentences
# After that, use the show() function to print the first 25 rows of the dataframe
# Hint: you'll need the Column object returned by removePunctuations(). 

# YOUR CODE HERE
from pyspark.sql.functions import col

# Applying the removePunctuation function to the 'value' column
cleaned_df = df.withColumn("sentence", removePunctuation(col("value")))

# Showing the first 25 rows of the cleaned DataFrame without truncating text
cleaned_df.select("sentence").show(25, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|sentence                                                                                                                                                                                                                                                                                                                                              

### 6. Get dataframe containing unique words that contain the letter 'z' (lowercase) and their counts - 3 points

#### 6.1 Create a dataframe of words - 1 point

#### Tasks:
1. Split each sentence into words based on the delimiter space (' ').
2. Put each word in each sentence row into their own rows. Put your results into a new dataframe.
3. Print out the first 5 rows of the dataframe.

#### Expected output: 
1. Show first 5 rows of the output dataframe which would be:

| word   |
---------
|my      |
|daughter|
|got     |
|her     |
|first   |
    
only showing top 5 rows

#### **Expected output** - First 5 rows of the word dataframe, under a column named `word`.

In [11]:
# DO NOT DELETE THIS CELL
# Assemble the 'split' and 'explode' column expressions, then apply them to the sentence column

from pyspark.sql.functions import split, explode

# YOUR CODE HERE

# Split each sentence into an array of words
words_df = cleaned_df.withColumn("words", split(col("sentence"), " "))

# Explode the words array into separate rows
exploded_df = words_df.withColumn("word", explode(col("words")))

# Select only the 'word' column and remove any rows where the word is empty
final_df = exploded_df.select("word").filter(col("word") != "")

# Show the first 5 rows of the DataFrame
final_df.show(5)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|    word|
+--------+
|      my|
|daughter|
|     got|
|     her|
|   first|
+--------+
only showing top 5 rows

#### 6.2 Filter words that contain the letter 'z' and count them - 2 points

#### Tasks:

1. Remove all empty rows in the dataframe. These might have crept in because of the empty lines or words in the file.
2. Filter the dataframe to contain only words that have the letter 'z' in them.
2. Group rows in the previous dataframe by unique words, then count the rows in each group. Put your results into a new dataframe.

#### Expected output:
1. Show the first 25 rows of the dataframe, where each row contains only one word. The dataframe must not contain empty rows. 
2. Show 25 rows of the dataframe containing unique words and their counts

##### The output after removing empty rows and filtering the dataframe would be similar to:


|          word|
----------------
|       jazz|
|         jazz|
|      jazz|
... 22 more

##### The output after grouping unique words and counting the rows in each group would be similar to:


|         word|count|
--------------|------
|arizona|1 |
|huzzah         |18 |
|zoo      |15   |
... 22 more


**NOTE** - The above tables are for illustration only. Your output may differ and should contain all 25 rows for each of the tasks


#### **Expected output** - First 25 rows of the dataframe containing words with the letter 'z' in them, under a column named `word`.

In [12]:
# DO NOT DELETE THIS CELL
# YOUR CODE HERE
from pyspark.sql.functions import length, col

# Filter for words containing the letter 'z'
words_with_z = final_df.filter(col("word").contains('z'))

# Show the first 25 rows of the DataFrame with words containing 'z'
words_with_z.show(25, truncate=False)




VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------------------+
|word                                                     |
+---------------------------------------------------------+
|amazons                                                  |
|globalized                                               |
|bazaar                                                   |
|frozen                                                   |
|recognize                                                |
|httpsimagesnasslimagesamazoncomimagesi51spdzt7umlss300jpg|
|alzheimers                                               |
|characterization                                         |
|amazed                                                   |
|characterizations                                        |
|arizona                                                  |
|arizona                                                  |
|size                                                     |
|arizona                                

#### **Expected output** - First 25 rows of the dataframe containing words with the letter 'z' in them and their counts, under `word` and `count` columns respectively

In [13]:
# DO NOT DELETE THIS CELL
# YOUR CODE HERE
# Group by 'word' and count each occurrence
word_counts = words_with_z.groupBy("word").count()
# Show the first 25 rows of the DataFrame with counts of unique words
word_counts.show(25, truncate=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------------------------------------------------------------+-----+
|word                                                                         |count|
+-----------------------------------------------------------------------------+-----+
|bazaar                                                                       |256  |
|digitized                                                                    |127  |
|glamourizing                                                                 |9    |
|scheherzade                                                                  |1    |
|enzoenzo                                                                     |1    |
|lizardsthe                                                                   |3    |
|agonizingly                                                                  |249  |
|breezy                                                                       |1912 |
|fizzlei                                              

### 7. Sort the word count dataframe in a descending order by count - 2 point

Your task: Sort the previous dataframe by the counts column in a descending order. Put your results into a new dataframe. 

Expected output: First 25 rows of the sorted word count dataframe. The first row would have the maximum count.

#### **Expected output** - First 25 rows of the sorted word count dataframe

In [14]:
# DO NOT DELETE THIS CELL
# YOUR CODE HERE
from pyspark.sql.functions import col

# Sort the word counts DataFrame by the 'count' column in descending order
sorted_word_counts = word_counts.orderBy(col("count").desc())

# Show the first 25 rows of the sorted DataFrame
sorted_word_counts.show(25, truncate=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+------+
|word            |count |
+----------------+------+
|amazing         |142528|
|amazon          |45475 |
|realize         |44217 |
|crazy           |29403 |
|realized        |25424 |
|size            |20188 |
|elizabeth       |17398 |
|organized       |13695 |
|recognize       |12812 |
|organization    |12307 |
|realizes        |11807 |
|amazed          |11584 |
|nazi            |10863 |
|koontz          |10793 |
|characterization|9894  |
|citizens        |9392  |
|magazine        |8937  |
|bizarre         |8724  |
|amazingly       |8311  |
|prize           |8288  |
|civilization    |7992  |
|zombie          |7892  |
|zero            |7850  |
|dozen           |7470  |
|cozy            |7207  |
+----------------+------+
only showing top 25 rows

### 8. Record the execution time

Your task: Print the execution time.

#### **Expected output** - The execution time. No particular value is expected.

In [15]:
# Print the time since execution start - You will need this value later.
print(time.time() - start_time)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

295.8728334903717

### 9. Save the sorted word counts directly to S3 as a CSV file - 1 point

NOTE: Spark uses a distributed memory system, and stores working data in fragments known as "partitions". This is advantageous when a Spark cluster spans multiple machines, as each machine will only require part of the working data to do its own job. By default, Spark will save each of these data partitions into a individual file to avoid I/O collisions. We want only one output file, so we'll need to fuse all the data into a single partition first. 

Your task: 
1. Coalesce the previous dataframe to one partition. This makes sure that all our results will end up in the same CSV file. 
2. Save the 1-partition dataframe to S3 using the DataFrame.write.csv() method. Take note to store the file inside S3, at a place that you can remember. The save path should look something like `s3://<your-bucket>/<your-folder>/<your-result-file>.csv`. Change these parameters to point to your bucket and folder.
3. Remember to save the csv file along with the header


### Note:

#### You only need to run the section 9 and section 10 once for the 4M dataset.
#### Section 11 requires you to run multiple iterations of this Notebook, and for that you can comment out the code in section 9 so that it's easier for you to run.

In [16]:
# Save results to S3
single_part_df = sorted_word_counts.coalesce(1)

# Define the path to your S3 bucket and folder
output_path = "s3://mrunown/code/data/sorted_word_counts.csv"

# Save the DataFrame as a single CSV file, including the header
single_part_df.write.csv(output_path, mode="overwrite", header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Stop Spark session

spark.stop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 10. Submission of `50_rows.csv` - Download the CSV file from S3 to your local machine and create the expected CSV output file - 2 point

1. Navigate to the S3 folder where you stored your output
2. Note the name of this file, it should look something like `part-00000-xx.....xx.csv`. 
3. Click on this file, it should open the file properties.
4. Click on 'Download'.
5. After downloading the file, you can rename it to anthing, say `results.csv`. 
6. We want you to submit a CSV containing the first 50 rows of the results file. Remember that we want the first 51 lines which would include the header as well - so basically it is header + 50 rows.



### 11. Submission of `exec_times.csv` - Execution times on different dataset and settings - 2 point

You need to experiment with using different number of master and worker nodes for running this whole Jupyter Notebook. You will have to report the execution time of this Notebook as you noted in an earlier section.

1. Create a cluster with the required number of master and worker(core) nodes.
2. Then go to the Kernel tab in JupyterLab, and do 'Restart and run all cells.'
3. You should note the time in the cell just before section 9 - this is the time that it took for all the code to run.
4. Then, start a new cluster with a different configuration of master and worker nodes and dataset as expected. Run the Notebook again, and note the execution times.
5. You must change the dataFileName to read the BookReviews_1M.txt from your S3 bucket. Also note that BookReviews_1M.txt and BookReviews_4M.json have different data formats and must be handled appropriately.
6. Create a csv file `exec_times.csv` and fill it in the following format:

| Dataset | #Master Nodes | #Core Nodes | Runtime_1 | Runtime_2 | Runtime_3 | Mean | Std |
| :-: | :-: | :-: | :-: | :-: | :-: | :-: | :-: |
| 1M | 1 | 1 | | | | | | 
| 4M | 1 | 1 | | | | | |
| 4M | 1 | 3 | | | | | |

### 12. Screenshots of terminated EMR clusters -`cluster_ss.png`

You need to attach a screenshot of your Amazon EMR 'Clusters' page which shows your terminated clusters, cluster creation time, cluster elapsed time, and your username.