# **Best and Worst Average Ratings of the Movielens Data Set Using Spark 2.0.0**
Alyssa April Dellow

P125528

# **Organisation of Notebook**

This Notebook consists of four major sections as follows:

1. **Section 1** - Installation of the PySpark package and mounting of Google Drive.
2. **Section 2** - A breakdown of the steps taken to analyse the MovieLens data set according to the checklist provided. In this section, comments are included to explain each line of code for better comprehension.
3. **Section 3** - A final combination of all the codes to get an overall view of the process.
4. **Section 4** - A simple explanation of how efficient and optimal Spark transformations and actions are used in this Notebook. An error handling technique utilised is also briefly mentioned.


# **Section 1**

## **PySpark Package**
[Apache Spark](https://spark.apache.org/) is defined as a  "multi-language engine for executing data engineering, data science and machine learning on single-node machines or clusters". It is a big data processing and analytics distributed computing system. [Spark 2.0.0](https://spark.apache.org/releases/spark-release-2-0-0.html) was released on 26 July 2016 with major updates on the API usability, SQL 2003 support, performance improvements, structured streaming, R UDF support and operational improvements.

<p align="justify">Before being able to conduct an analysis on the MovieLens data set using Spark 2.0.0 in Google Colab, the PySpark package needs to be installed first. PySpark is included in the official Spark releases and is now available in the Python Package Index (PyPI), which is a repository of software for the Python programming language. By installing the PySpark package, it offers us the required tools and APIs for interacting with Spark via the execution of familiar Python codes, making our tasks much more convenient. Once PySpark has been imported, Spark's ability to handle data can be utilised to the fullest in completing our MovieLens analysis.

In [None]:
# Install PySpark package
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# Mount Google Drive in Google Colab.
# This enables files saved in Google Drive to be accessed directly from this
# Notebook.
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# **Section 2**

<p align="justify">This section goes through each code step by step and lists out the parts where each point in the checklist is accomplished.

**Import all the necessary modules**

In [None]:
# The entry point to programming Spark with the Dataset and DataFrame API.
from pyspark.sql import SparkSession

# Import the ROW method, which takes up the argument for creating Row Object.
from pyspark.sql import Row

# col: Returns a Column based on the given column name.
# avg: Aggregate function that returns the average of the values in a group.
# min: Aggregate function that returns the minimum value of the expression in
# a group.
from pyspark.sql.functions import col, avg, min

**Successfully loaded the MovieLens dataset into Spark**

In [None]:
# Define the parseInput function
def parseInput(line):
  # Separate the input into individual fields based on whitespace.
  fields = line.split()
  # Create a Row object based on the parsed data of movie id, rating and
  # timestamp from the u.data file later on. The columns are named movieID,
  # rating and timestamp.
  return Row(movieID = int(fields[1]), rating = float(fields[2]),
             timestamp = float(fields[3]))

In [None]:
# Create a SparkSession using the supplied name "Best&WorstMovieRatings".
# This allows us to communicate with Spark and execute various actions on the
# data using the Spark APIs.
# getOrCreate() will retrieve an existing SparkSession or create a new one
# if it does not exist.
spark = SparkSession.builder.appName("Best&WorstMovieRatings").getOrCreate()

# Read-in the u.item file into Google Colab.
# Select only the movie id and movie title columns, then rename them.
item_df = spark.read.csv("/content/drive/MyDrive/DM_Assignment3/u.item",
                           header=False, sep="|")
item_df = item_df.select(col("_c0").alias("movieID"),
                             col("_c1").alias("movieName"))

# Print the first 10 rows of item_df to the console.
# The argument "truncate = False" ensures that the whole string is printed
# to the console.
item_df.show(n = 10, truncate = False)

# Read the raw data called u.data.
# Reads the text file from the path provided and returns a Resilient
# Distributed Dataset (RDD).
lines = spark.sparkContext.\
        textFile("/content/drive/MyDrive/DM_Assignment3/u.data")

# Convert the data into a RDD of Row objects with (movie id, rating and
# timestamp)
u_data = lines.map(parseInput)

# Convert the RDD of Row objects into a DataFrame and cache it
data_df = spark.createDataFrame(u_data).cache()

+-------+----------------------------------------------------+
|movieID|movieName                                           |
+-------+----------------------------------------------------+
|1      |Toy Story (1995)                                    |
|2      |GoldenEye (1995)                                    |
|3      |Four Rooms (1995)                                   |
|4      |Get Shorty (1995)                                   |
|5      |Copycat (1995)                                      |
|6      |Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)|
|7      |Twelve Monkeys (1995)                               |
|8      |Babe (1995)                                         |
|9      |Dead Man Walking (1995)                             |
|10     |Richard III (1995)                                  |
+-------+----------------------------------------------------+
only showing top 10 rows



In [None]:
# Print the first 10 rows of data_df to the console
data_df.show(n = 10)

+-------+------+------------+
|movieID|rating|   timestamp|
+-------+------+------------+
|    242|   3.0|8.81250949E8|
|    302|   3.0|8.91717742E8|
|    377|   1.0|8.78887116E8|
|     51|   2.0|8.80606923E8|
|    346|   1.0|8.86397596E8|
|    474|   4.0|8.84182806E8|
|    265|   2.0|8.81171488E8|
|    465|   5.0|8.91628467E8|
|    451|   3.0|8.86324817E8|
|     86|   3.0|8.83603013E8|
+-------+------+------------+
only showing top 10 rows



**Filtered out the movies with less than and equal to 100 ratings**

In [None]:
# Perform filtering to keep only movies with more than 100 ratings by first
# grouping the data in data_df by their movieID and then counting the
# number of rows for each group.
more_100_ratings = data_df.groupBy("movieID").count().filter("count > 100")

# Print the first 10 rows of more_100_ratings to the console
more_100_ratings.show(n = 10)

# This reflects the subset of movies with a significant number of ratings,
# which could somewhat be an indicator of popularity.

+-------+-----+
|movieID|count|
+-------+-----+
|    474|  194|
|     29|  114|
|     65|  115|
|    191|  276|
|    418|  129|
|    222|  365|
|    293|  147|
|    270|  136|
|    367|  170|
|    705|  137|
+-------+-----+
only showing top 10 rows



**Included only the oldest timestamp**

**Calculated the average rating for each movie**

In [None]:
# Carries out an inner join between more_100_ratings and data_df based on
# the movieID column. For keys that don't match, the rows are dropped. Hence,
# only movies with more than 100 ratings are in filtered_movies.
filtered_movies = more_100_ratings.join(data_df,
                                           more_100_ratings.\
                                           movieID == data_df.movieID)

# Drops one of the movieID columns since they are redundant, and also drop count
filtered_movies = filtered_movies.drop(more_100_ratings.movieID, "count")

# Print the first 10 rows of filtered_movies to the console
filtered_movies.show(n = 10)

+-------+------+------------+
|movieID|rating|   timestamp|
+-------+------+------------+
|    242|   3.0|8.81250949E8|
|    302|   3.0|8.91717742E8|
|    346|   1.0|8.86397596E8|
|    474|   4.0|8.84182806E8|
|    265|   2.0|8.81171488E8|
|    451|   3.0|8.86324817E8|
|     86|   3.0|8.83603013E8|
|    257|   2.0|8.79372434E8|
|    222|   5.0| 8.7604234E8|
|     29|   3.0|8.88104457E8|
+-------+------+------------+
only showing top 10 rows



In [None]:
# Carries out an inner join between item_df and filtered_movies based on
# the movieID column.
filtered_movies = item_df.join(filtered_movies,
                                    item_df.movieID == filtered_movies.\
                                    movieID)

# Drops one of the movieID columns since they are redundant
filtered_movies = filtered_movies.drop(item_df.movieID)

# Print the first 10 rows of filtered_movies to the console
filtered_movies.show(n = 10, truncate = False)

+---------------------------------------------------------------------------+-------+------+------------+
|movieName                                                                  |movieID|rating|timestamp   |
+---------------------------------------------------------------------------+-------+------+------------+
|Kolya (1996)                                                               |242    |3.0   |8.81250949E8|
|L.A. Confidential (1997)                                                   |302    |3.0   |8.91717742E8|
|Jackie Brown (1997)                                                        |346    |1.0   |8.86397596E8|
|Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)|474    |4.0   |8.84182806E8|
|Hunt for Red October, The (1990)                                           |265    |2.0   |8.81171488E8|
|Grease (1978)                                                              |451    |3.0   |8.86324817E8|
|Remains of the Day, The (1993)               

In [None]:
# Group the data in data_df by their movieID and perform an
# aggregation operation on each group. Find the minimum
# value of timestamp within each group, which represents the oldest timestamp.
# Then rename the column as oldest_timestamp.
oldest_timestamp = data_df.groupBy("movieID").\
                   agg(min("timestamp").alias('oldest_timestamp'))

# Print the first 10 rows of oldest_timestamp to the console.
oldest_timestamp.show(n = 10)

+-------+----------------+
|movieID|oldest_timestamp|
+-------+----------------+
|    474|    8.74777623E8|
|     29|    8.74796373E8|
|     26|    8.74957053E8|
|    964|    8.76465335E8|
|     65|    8.74787467E8|
|    191|    8.74730396E8|
|   1224|    8.75638842E8|
|    558|    8.74787526E8|
|   1010|    8.74786784E8|
|    418|    8.74730553E8|
+-------+----------------+
only showing top 10 rows



In [None]:
# Carries out an inner join between oldest_timestamp and filtered_movies
# based on the movieID column. This way, rows without the oldest timestamp
# will be dropped.
filtered_movies = oldest_timestamp.\
                     join(filtered_movies, oldest_timestamp.\
                          movieID == filtered_movies.movieID)

# Drops one of the movieID columns and the timestamp column.
filtered_movies = filtered_movies.drop(oldest_timestamp.movieID,
                     filtered_movies.timestamp)

# Print the first 10 rows of filtered_movies to the console.
filtered_movies.show(n = 10, truncate = False)

+----------------+---------------------------------------------------------------------------+-------+------+
|oldest_timestamp|movieName                                                                  |movieID|rating|
+----------------+---------------------------------------------------------------------------+-------+------+
|8.74951617E8    |Kolya (1996)                                                               |242    |3.0   |
|8.75408934E8    |L.A. Confidential (1997)                                                   |302    |3.0   |
|8.83099368E8    |Jackie Brown (1997)                                                        |346    |1.0   |
|8.74777623E8    |Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)|474    |4.0   |
|8.74792483E8    |Hunt for Red October, The (1990)                                           |265    |2.0   |
|8.74787256E8    |Grease (1978)                                                              |451    |3.0   |
|8.7478206

In [None]:
# Group the data in filtered_movies based on the movieID, movieName and
# oldest_timestamp columns. Perform the aggregation operation on each group
# before computing the average value of rating for each group. This
# represents the average rating for a particular movie. After that, rename
# the column as average_rating.
avg_ratings_df = filtered_movies.groupBy(filtered_movies.movieID,
                                            item_df.movieName,
                                            oldest_timestamp.oldest_timestamp).\
                                            agg(avg("rating").\
                                            alias("average_rating"))

# Print the first 10 rows of avg_ratings_df to the console
avg_ratings_df.show(n = 10, truncate = False)

+-------+-------------------------------+----------------+------------------+
|movieID|movieName                      |oldest_timestamp|average_rating    |
+-------+-------------------------------+----------------+------------------+
|68     |Crow, The (1994)               |8.74778479E8    |3.417910447761194 |
|276    |Leaving Las Vegas (1995)       |8.74775262E8    |3.697986577181208 |
|164    |Abyss, The (1989)              |8.74792663E8    |3.589403973509934 |
|31     |Crimson Tide (1995)            |8.74781779E8    |3.6298701298701297|
|288    |Scream (1996)                  |8.74724905E8    |3.4414225941422596|
|255    |My Best Friend's Wedding (1997)|8.7472471E8     |3.36046511627907  |
|705    |Singin' in the Rain (1952)     |8.74785526E8    |3.9927007299270074|
|180    |Apocalypse Now (1979)          |8.74777528E8    |4.04524886877828  |
|628    |Sleepers (1996)                |8.74775185E8    |3.514792899408284 |
|403    |Batman (1989)                  |8.74828826E8    |3.4278

**Sorted the movies based on the average rating (in descending order)**

In [None]:
# Orders avg_ratings_df in descending order based on the average_rating values.
# This allows us to view movies with the highest average ratings first.
avg_ratings_df = avg_ratings_df.orderBy("average_rating", ascending = False)

# Print the first 10 rows of avg_ratings_df to the console
avg_ratings_df.show(n = 10, truncate = False)

+-------+--------------------------------+----------------+------------------+
|movieID|movieName                       |oldest_timestamp|average_rating    |
+-------+--------------------------------+----------------+------------------+
|408    |Close Shave, A (1995)           |8.74784538E8    |4.491071428571429 |
|318    |Schindler's List (1993)         |8.74777948E8    |4.466442953020135 |
|169    |Wrong Trousers, The (1993)      |8.7477789E8     |4.466101694915254 |
|483    |Casablanca (1942)               |8.74786695E8    |4.45679012345679  |
|64     |Shawshank Redemption, The (1994)|8.74777701E8    |4.445229681978798 |
|603    |Rear Window (1954)              |8.74785448E8    |4.3875598086124405|
|12     |Usual Suspects, The (1995)      |8.74777491E8    |4.385767790262173 |
|50     |Star Wars (1977)                |8.7472975E8     |4.3584905660377355|
|178    |12 Angry Men (1957)             |8.74831383E8    |4.344             |
|134    |Citizen Kane (1941)             |8.74777623

**Selected the top 25 movies with the highest average rating**

**Selected the top 25 movies with the lowest average rating**

In [None]:
# Since avg_ratings_df is already in descending order of average_rating, we
# limit avg_ratings to contain only the first 25 rows. This gives us the top
# 25 best movies based on average ratings.
best_25_average_ratings = avg_ratings_df.limit(25)

# To obtain the top 25 movies with the lowest average ratings,
# order avg_ratings_df in ascending order of average_rating, in order to display
# movies with the lowest average ratings first. Limit to 25 results.
worst_25_average_ratings = avg_ratings_df.orderBy("average_rating").limit(25)

In [None]:
# Print the first 25 rows of best_25_average_ratings to the console. This
# represents the top 25 movies with the best average ratings.
best_25_average_ratings.show(n = 25, truncate = False)

+-------+---------------------------------------------------------------------------+----------------+------------------+
|movieID|movieName                                                                  |oldest_timestamp|average_rating    |
+-------+---------------------------------------------------------------------------+----------------+------------------+
|408    |Close Shave, A (1995)                                                      |8.74784538E8    |4.491071428571429 |
|318    |Schindler's List (1993)                                                    |8.74777948E8    |4.466442953020135 |
|169    |Wrong Trousers, The (1993)                                                 |8.7477789E8     |4.466101694915254 |
|483    |Casablanca (1942)                                                          |8.74786695E8    |4.45679012345679  |
|64     |Shawshank Redemption, The (1994)                                           |8.74777701E8    |4.445229681978798 |
|603    |Rear Window (19

In [None]:
# Print the first 25 rows of worst_25_average_ratings to the console. This
# represents the top 25 movies with the worst average ratings.
worst_25_average_ratings.show(n = 25, truncate = False)

+-------+--------------------------------------+----------------+------------------+
|movieID|movieName                             |oldest_timestamp|average_rating    |
+-------+--------------------------------------+----------------+------------------+
|122    |Cable Guy, The (1996)                 |8.7478715E8     |2.339622641509434 |
|243    |Jungle2Jungle (1997)                  |8.74951039E8    |2.4393939393939394|
|325    |Crash (1996)                          |8.74786419E8    |2.546875          |
|260    |Event Horizon (1997)                  |8.74786439E8    |2.574803149606299 |
|358    |Spawn (1997)                          |8.74786419E8    |2.6153846153846154|
|29     |Batman Forever (1995)                 |8.74796373E8    |2.6666666666666665|
|231    |Batman Returns (1992)                 |8.74778424E8    |2.683098591549296 |
|259    |George of the Jungle (1997)           |8.7482796E8     |2.685185185185185 |
|926    |Down Periscope (1996)                 |8.751301E8      |

**Ordered the output results on the console by oldest timestamp**

**Displayed the results on the console**

In [None]:
# The top 25 movies with the best average ratings are then ordered in
# ascending order of oldest_timestamp.
best_avg_rate_tmstmp = best_25_average_ratings.orderBy("oldest_timestamp")

# Rename the average_rating column as best_average_ratings
best_avg_rate_tmstmp = best_avg_rate_tmstmp.\
                          withColumnRenamed("average_rating",
                                            "best_average_ratings")

# The top 25 movies with the worst average ratings are ordered in
# ascending order of oldest_timestamp.
worst_avg_rate_tmstmp = worst_25_average_ratings.orderBy("oldest_timestamp")

# Rename the average_rating column as worst_average_ratings
worst_avg_rate_tmstmp = worst_avg_rate_tmstmp.\
                        withColumnRenamed("average_rating",
                                          "worst_average_ratings")

In [None]:
# Give an indication of what the table below is about
print("Top 25 Movies with the Best Average Ratings Sorted by " +
"Oldest Timestamp")
# Print the first 25 rows of best_avg_rate_tmstmp to the console.
best_avg_rate_tmstmp.show(n = 25, truncate = False)

Top 25 Movies with the Best Average Ratings Sorted by Oldest Timestamp
+-------+---------------------------------------------------------------------------+----------------+--------------------+
|movieID|movieName                                                                  |oldest_timestamp|best_average_ratings|
+-------+---------------------------------------------------------------------------+----------------+--------------------+
|357    |One Flew Over the Cuckoo's Nest (1975)                                     |8.74725485E8    |4.291666666666667   |
|50     |Star Wars (1977)                                                           |8.7472975E8     |4.3584905660377355  |
|172    |Empire Strikes Back, The (1980)                                            |8.74729901E8    |4.204359673024523   |
|174    |Raiders of the Lost Ark (1981)                                             |8.74729995E8    |4.252380952380952   |
|12     |Usual Suspects, The (1995)                          

In [None]:
# Give an indication of what the table below is about
print("Top 25 Movies with the Worst Average Ratings Sorted by " +
"Oldest Timestamp")
# Print the first 25 rows of worst_avg_rate_tmstmp to the console.
worst_avg_rate_tmstmp.show(n = 25, truncate = False)

Top 25 Movies with the Worst Average Ratings Sorted by Oldest Timestamp
+-------+--------------------------------------+----------------+---------------------+
|movieID|movieName                             |oldest_timestamp|worst_average_ratings|
+-------+--------------------------------------+----------------+---------------------+
|38     |Net, The (1995)                       |8.74730553E8    |3.0083333333333333   |
|323    |Dante's Peak (1997)                   |8.74774449E8    |2.933333333333333    |
|235    |Mars Attacks! (1996)                  |8.74774956E8    |2.847926267281106    |
|546    |Broken Arrow (1996)                   |8.74775914E8    |3.031496062992126    |
|53     |Natural Born Killers (1994)           |8.74778274E8    |2.953125             |
|231    |Batman Returns (1992)                 |8.74778424E8    |2.683098591549296    |
|252    |Lost World: Jurassic Park, The (1997) |8.74780832E8    |2.9430379746835444   |
|325    |Crash (1996)                          |

**Saved the output of the results**

<p align="justify">The codes are inserted as comments since it will be run again in Section 3. This is to avoid redundant saving of files and errors.

In [None]:
# Write best_avg_rate_tmstmp to a csv file called "best_average_ratings.csv" in
# the path specified.

#best_avg_rate_tmstmp.write.\
#csv("/content/drive/MyDrive/DM_Assignment3/best_average_ratings.csv",
#    header=True)

# Write worst_avg_rate_tmstmp to a csv file called "worst_average_ratings.csv"
# in the path specified.

#worst_avg_rate_tmstmp.write.\
#csv("/content/drive/MyDrive/DM_Assignment3/worst_average_ratings.csv",
#    header=True)

**Stop the underlying SparkContext**

In [None]:
# Stop the underlying SparkContext and release the resources associated with it.
# This function is new in Spark 2.0.0
spark.stop()

# **Section 3**

<p align="justify">This section provides an overall view of the codes utilised to get to the end results of the top 25 best and worst movies based on average ratings (with more than 100 ratings), ordered by oldest timestamp. The printing of data frames except for the two end result tables are excluded from this section.

Note: (`if __name__ == "__main__":`) is added in this section as an error handling technique that will be explained in Section 4.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col, avg, min

In [None]:
def parseInput(line):
  fields = line.split()
  return Row(movieID = int(fields[1]), rating = float(fields[2]),
             timestamp = float(fields[3]))

In [None]:
if __name__ == "__main__":

  spark = SparkSession.builder.appName("Best&WorstMovieRatings").getOrCreate()

  item_df = spark.read.csv("/content/drive/MyDrive/DM_Assignment3/u.item",
                           header=False, sep="|")
  item_df = item_df.select(col("_c0").alias("movieID"),
                           col("_c1").alias("movieName"))

  lines = spark.sparkContext.\
  textFile("/content/drive/MyDrive/DM_Assignment3/u.data")

  u_data = lines.map(parseInput)

  data_df = spark.createDataFrame(u_data).cache()

  more_100_ratings = data_df.groupBy("movieID").count().filter("count > 100")

  filtered_movies = more_100_ratings.join(data_df, more_100_ratings.\
                                          movieID == data_df.movieID)
  filtered_movies = filtered_movies.drop(more_100_ratings.movieID)

  filtered_movies = item_df.join(filtered_movies, item_df.\
                                 movieID == filtered_movies.movieID)
  filtered_movies = filtered_movies.drop(item_df.movieID)

  oldest_timestamp = data_df.groupBy("movieID").agg(min("timestamp").\
                                                    alias('oldest_timestamp'))

  filtered_movies = oldest_timestamp.join(filtered_movies, oldest_timestamp.\
                                          movieID == filtered_movies.movieID)
  filtered_movies = filtered_movies.drop(oldest_timestamp.movieID,
                                         filtered_movies.timestamp)

  avg_ratings_df = filtered_movies.groupBy(filtered_movies.movieID,
                                           item_df.movieName,
                                           oldest_timestamp.oldest_timestamp).\
                                           agg(avg("rating").\
                                           alias("average_rating"))

  avg_ratings_df = avg_ratings_df.orderBy("average_rating", ascending = False)

  best_25_average_ratings = avg_ratings_df.limit(25)
  worst_25_average_ratings = avg_ratings_df.orderBy("average_rating").limit(25)

  best_avg_rate_tmstmp = best_25_average_ratings.orderBy("oldest_timestamp")
  best_avg_rate_tmstmp = best_avg_rate_tmstmp.\
                          withColumnRenamed("average_rating",
                                            "best_average_ratings")

  worst_avg_rate_tmstmp = worst_25_average_ratings.orderBy("oldest_timestamp")
  worst_avg_rate_tmstmp = worst_avg_rate_tmstmp.\
                        withColumnRenamed("average_rating",
                                          "worst_average_ratings")

  best_avg_rate_tmstmp.write.\
  csv("/content/drive/MyDrive/DM_Assignment3/best_average_ratings.csv",
      header=True)

  worst_avg_rate_tmstmp.write.\
  csv("/content/drive/MyDrive/DM_Assignment3/worst_average_ratings.csv",
      header=True)

  print("Top 25 Movies with the Best Average Ratings Sorted by " +
  "Oldest Timestamp")
  best_avg_rate_tmstmp.show(n = 25, truncate = False)

  print("Top 25 Movies with the Worst Average Ratings Sorted by " +
  "Oldest Timestamp")
  worst_avg_rate_tmstmp.show(n = 25, truncate = False)

  spark.stop()

Top 25 Movies with the Best Average Ratings Sorted by Oldest Timestamp
+-------+---------------------------------------------------------------------------+----------------+--------------------+
|movieID|movieName                                                                  |oldest_timestamp|best_average_ratings|
+-------+---------------------------------------------------------------------------+----------------+--------------------+
|357    |One Flew Over the Cuckoo's Nest (1975)                                     |8.74725485E8    |4.291666666666667   |
|50     |Star Wars (1977)                                                           |8.7472975E8     |4.3584905660377355  |
|172    |Empire Strikes Back, The (1980)                                            |8.74729901E8    |4.204359673024523   |
|174    |Raiders of the Lost Ark (1981)                                             |8.74729995E8    |4.252380952380952   |
|12     |Usual Suspects, The (1995)                          

# **Section 4**




**Used efficient and optimal Spark transformations and actions**

<p align="justify">By implementing efficient and optimal Spark transformations and actions as listed down below, the codes aim to process the data in Spark with improved performance.

1. **RDD of Row objects and Data Frame**: In the codes, an RDD was created, where it is a way of storing key value information/general information in an object on the cluster. It is able to handle failure in a resilient manner. Besides that, it is compatible with various APIs and libraries, making it beneficial for heavy data processing tasks. The RDD can be easily combined with data frame based activities by converting it into a data frame. It can also be converted into an RDD of Row objects, which are flexible and dynamic in representing data. Without the need for a specific schema, Row objects can convert between RDDs and DataFrames without any issues. Data frames are more efficient and thus building data frames on top of the RDD of Row objects can provide more efficient query execution and optimisation.
2. **Caching**: The *cache* function is then used to cache the data frame from earlier. By keeping the data frame in memory and preventing needless recomputation when the data is reused, caching enhances the performance of frequently accessed data.
3. **Column pruning**: By using the *drop* function, redundant and irrelavent columns are removed from being processed further. Even the "count" column is dropped right after it is no longer needed. The *count* function is a Spark action that should be removed when not required to avoid unnecessary usage of CPU cycles and other resources.
4. **Filtering**: The *filter* transformation is applied at an early stage, directly to the data frame, which reduces the data to be processed and analysed in the subsequent codes.
5. **Selecting relevant columns**: In reading the u.item data set, the *select* function is used to only retrieve relevant columns (movie ID and movie title) based on our query. This is more optimal than simply reading-in all the columns available.
6. **Group by**: The *groupBy* function aids in aggregating data based on the specified columns. This way, any calculations or computations can be done on each group more efficiently.
7. **Stopping the underlying SparkContext**: The *spark.stop* function is used to stop the SparkSession and release any resources held by Spark to ensure  appropriate resource deallocation and cleanup. If the SparkSession is not shut down, its cluster resources cannot be assigned to other tasks.



**Used appropriate error handling techniques**

The error handling technique utilised in the codes mainly comes from (`if __name__ == "__main__":`). Codes inside this if statement are only executed when the program is run directly by the Python interpreter. We can dictate what goes into this if statement. In the case of being run directly by the Python interpreter, `__name__`, which is a unique variable in Python, will be set to `__main__`. On the other hand, the code will not be executed if it is imported as a module. As a matter of fact, `__name__` will be set as the name of the module, thus rendering the script unable to be executed. We can then avoid unwanted executions. It is a mechanism to regulate how a script runs based on whether it is imported as a module or ran directly.
