HOMEWORK 6 - Movie Rating Analysis using Apache Spark (Pipeline)

Preparing Space

In [31]:
# Install and configure Spark environment
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to r2u.stat.illinois.                                                                               Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Fetched 257 kB in 1s (218 kB/s)
Reading package lists... Done
W: Skipping acquire o

Upload datasets ratings and movies from the 32M dataset

In [32]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MovieRatingsAnalysis").getOrCreate()

movies = spark.read.csv("movies.csv", header=True, inferSchema=True)
ratings = spark.read.csv("ratings.csv", header=True, inferSchema=True)

start_time_join = time.time()
data = ratings.join(movies, on="movieId")
spark_time_join = time.time() - start_time_join

print(f"Spark join execution time: {spark_time_join:.2f} seconds")

data.show(5)

Spark join execution time: 0.01 seconds
+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|     17|     1|   4.0|944249077|Sense and Sensibi...|       Drama|Romance|
|     25|     1|   1.0|944250228|Leaving Las Vegas...|       Drama|Romance|
|     29|     1|   2.0|943230976|City of Lost Chil...|Adventure|Drama|F...|
|     30|     1|   5.0|944249077|Shanghai Triad (Y...|         Crime|Drama|
|     32|     1|   5.0|943228858|Twelve Monkeys (a...|Mystery|Sci-Fi|Th...|
+-------+------+------+---------+--------------------+--------------------+
only showing top 5 rows



What we would do is calculate average rating and number of ratings for each movie, and then classify it between different categories

In [42]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count, when

start_time = time.time()

movie_stats = data.groupBy("movieId", "title") \
    .agg(avg("rating").alias("avg_rating"), count("rating").alias("num_ratings"))

movie_stats = movie_stats.withColumn(
    "category",
    when(movie_stats.avg_rating >= 8, "Excellent")
    .when(movie_stats.avg_rating >= 7, "Good")
    .when(movie_stats.avg_rating >= 5, "Regular")
    .otherwise("Bad")
)

category_counts = movie_stats.groupBy("category").agg(count("*").alias("total_movies"))
category_counts.show()

spark_time = time.time() - start_time

movie_stats.show(10)
print(f"Spark execution time: {spark_time:.2f} seconds")

+--------+------------+
|category|total_movies|
+--------+------------+
|     Bad|       82987|
| Regular|        1445|
+--------+------------+

+-------+--------------------+------------------+-----------+--------+
|movieId|               title|        avg_rating|num_ratings|category|
+-------+--------------------+------------------+-----------+--------+
|    442|Demolition Man (1...|3.0941247002398082|      20850|     Bad|
| 183837|       The Favourite|3.8472875509564126|       3189|     Bad|
|   2657|Rocky Horror Pict...| 3.363947776628749|      15472|     Bad|
|   4085|Beverly Hills Cop...|3.5820329590199798|      13714|     Bad|
|  45447|Da Vinci Code, Th...|3.1727983008419933|      13183|     Bad|
|   6548|  Bad Boys II (2003)| 3.150354370570368|       5926|     Bad|
|  38886|Squid and the Wha...|3.6726312201772324|       2934|     Bad|
|    493|Menace II Society...| 3.618550213944123|       3973|     Bad|
|   2076|  Blue Velvet (1986)|3.8593432496139153|      12303|     Bad|
|  

In [34]:
movie_stats.write.mode("overwrite").csv("output/popular_movies_by_rating", header=True)

ONE SINGLE


In [35]:
import pandas as pd
import time

start_time_join = time.time()
ratings_pd = pd.read_csv("ratings.csv")
movies_pd = pd.read_csv("movies.csv")

data_pd = pd.merge(ratings_pd, movies_pd, on="movieId")

pandas_time_join = time.time() - start_time_join

print(f"Pandas join execution time: {pandas_time_join:.2f} seconds")

Pandas join execution time: 33.18 seconds


In [43]:
start_time2 = time.time()

grouped = data_pd.groupby(["movieId", "title"])["rating"].agg(["mean", "count"]).reset_index()
grouped.columns = ["movieId", "title", "avg_rating", "num_ratings"]

def classify(r):
    if r >= 8:
        return "Excellent"
    elif r >= 7:
        return "Good"
    elif r >= 5:
        return "Regular"
    else:
        return "Bad"

grouped["category"] = grouped["avg_rating"].apply(classify)

category_counts_pd = grouped["category"].value_counts().sort_index()
print(category_counts_pd)

pandas_time2 = time.time() - start_time2

print(grouped[["title", "avg_rating", "num_ratings", "category"]].head(10))
print(f"Pandas execution time: {pandas_time2:.2f} seconds")


category
Bad        82987
Regular     1445
Name: count, dtype: int64
                                title  avg_rating  num_ratings category
0                    Toy Story (1995)    3.897438        68997      Bad
1                      Jumanji (1995)    3.275758        28904      Bad
2             Grumpier Old Men (1995)    3.139447        13134      Bad
3            Waiting to Exhale (1995)    2.845331         2806      Bad
4  Father of the Bride Part II (1995)    3.059602        13154      Bad
5                         Heat (1995)    3.868277        29490      Bad
6                      Sabrina (1995)    3.363968        13585      Bad
7                 Tom and Huck (1995)    3.115563         1510      Bad
8                 Sudden Death (1995)    2.987723         4154      Bad
9                    GoldenEye (1995)    3.427850        32474      Bad
Pandas execution time: 9.93 seconds


Comparison

In [44]:
print(f"Spark join execution time: {spark_time_join:.2f} seconds")
print(f"Pandas join execution time: {pandas_time_join:.2f} seconds")
print(f"Spark execution time: {spark_time:.2f} seconds")
print(f"Pandas execution time: {pandas_time2:.2f} seconds")

Spark join execution time: 0.01 seconds
Pandas join execution time: 33.18 seconds
Spark execution time: 62.17 seconds
Pandas execution time: 9.93 seconds
