<a href="https://colab.research.google.com/github/Prosper1325/BarCoktail/blob/main/PYSPARK_AMOUZOU_PROSPER.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark MovieLens


## 0. Setup (Run this cell)

In [None]:
!pip install -q pyspark

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("PySpark_Student_Exercises_Movies")
    .master("local[*]")
    .getOrCreate()
)

spark

## 1. Download & Load MovieLens Dataset

In [None]:
!wget -q https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
!unzip -q ml-latest-small.zip

ratings = spark.read.csv("ml-latest-small/ratings.csv", header=True, inferSchema=True)
movies  = spark.read.csv("ml-latest-small/movies.csv", header=True, inferSchema=True)

## Task 1 — Data Exploration

**1.1 Inspect the data**

*	Show first 10 rows of ratings
*	Print schema for both DataFrames
*	Count the number of rows in each




**1.2 Column selection**

Write PySpark code to:

* select only userId, movieId, rating
*	rename movieId → film_id
*	cast rating to integer or float explicitly

**1.3 Filtering**

Using ratings:

*	Find all ratings from userId = 1
*	Find all ratings greater than 4.5
*	Find all ratings on movieId in (1, 50, 100)


(use isin())

**1.4 Sorting & limiting**

*	Show the top 20 highest ratings
*	Show the 10 lowest ratings made by user 600
*	Sort by rating desc and timestamp asc

**1.5 Derived columns**

Create columns:

*	rating_x2 = rating * 2
*	positive_rating = 1 if rating ≥ 4 else 0
*	log_rating = log10(rating + 1)

**1.6 Missing values**

Even if ML-latest-small has no nulls, :

*	Add a fake null column
*	Demonstrate fill/replace/drop operations

**1.7 Distinct & deduplication**

*	Count unique users
*	Count unique movies rated
*	Drop duplicate ratings where (userId, movieId) might repeat (even though dataset is clean)

# INSPECTION OF THE DATA

In [None]:
ratings.show(10)
ratings.printSchema()
movies.printSchema()
ratings.count()  # 100836
movies.count()  # 9742

+------+-------+------+---------+---------+
|userId|movieId|rating|timestamp|fake_null|
+------+-------+------+---------+---------+
|     1|      1|     4|964982703|     NULL|
|     1|      3|     4|964981247|     NULL|
|     1|      6|     4|964982224|     NULL|
|     1|     47|     5|964983815|     NULL|
|     1|     50|     5|964982931|     NULL|
|     1|     70|     3|964982400|     NULL|
|     1|    101|     5|964980868|     NULL|
|     1|    110|     4|964982176|     NULL|
|     1|    151|     5|964984041|     NULL|
|     1|    157|     5|964984100|     NULL|
+------+-------+------+---------+---------+
only showing top 10 rows
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- fake_null: void (nullable = true)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



9742

# Column selection

In [None]:
ratings.select("userId","movieId","rating").show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|     4|
|     1|      3|     4|
|     1|      6|     4|
|     1|     47|     5|
|     1|     50|     5|
|     1|     70|     3|
|     1|    101|     5|
|     1|    110|     4|
|     1|    151|     5|
|     1|    157|     5|
|     1|    163|     5|
|     1|    216|     5|
|     1|    223|     3|
|     1|    231|     5|
|     1|    235|     4|
|     1|    260|     5|
|     1|    296|     3|
|     1|    316|     3|
|     1|    333|     5|
|     1|    349|     4|
+------+-------+------+
only showing top 20 rows


In [None]:
ratings.withColumnRenamed("movieId","film_id").show()

+------+-------+------+---------+---------+
|userId|film_id|rating|timestamp|fake_null|
+------+-------+------+---------+---------+
|     1|      1|     4|964982703|     NULL|
|     1|      3|     4|964981247|     NULL|
|     1|      6|     4|964982224|     NULL|
|     1|     47|     5|964983815|     NULL|
|     1|     50|     5|964982931|     NULL|
|     1|     70|     3|964982400|     NULL|
|     1|    101|     5|964980868|     NULL|
|     1|    110|     4|964982176|     NULL|
|     1|    151|     5|964984041|     NULL|
|     1|    157|     5|964984100|     NULL|
|     1|    163|     5|964983650|     NULL|
|     1|    216|     5|964981208|     NULL|
|     1|    223|     3|964980985|     NULL|
|     1|    231|     5|964981179|     NULL|
|     1|    235|     4|964980908|     NULL|
|     1|    260|     5|964981680|     NULL|
|     1|    296|     3|964982967|     NULL|
|     1|    316|     3|964982310|     NULL|
|     1|    333|     5|964981179|     NULL|
|     1|    349|     4|964982563

In [None]:
#cast rating to integer or float explicitly
from pyspark.sql.types import IntegerType
ratings = ratings.withColumn("rating", ratings["rating"].cast(IntegerType()))
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- fake_null: void (nullable = true)



# Filtering

In [None]:

'''Find all ratings greater than 4.5
    Find all ratings on movieId in (1, 50, 100) '''
# Find all ratings from userId = 1
ratings.filter(ratings.userId==1).show()

# Find all ratings greater than 4.5
ratings.filter(ratings.rating>4.5).show()

#Find all ratings on movieId in (1, 50, 100)
ratings.filter(ratings.movieId.isin([1,50,100])).show()

+------+-------+------+---------+---------+
|userId|movieId|rating|timestamp|fake_null|
+------+-------+------+---------+---------+
|     1|      1|     4|964982703|     NULL|
|     1|      3|     4|964981247|     NULL|
|     1|      6|     4|964982224|     NULL|
|     1|     47|     5|964983815|     NULL|
|     1|     50|     5|964982931|     NULL|
|     1|     70|     3|964982400|     NULL|
|     1|    101|     5|964980868|     NULL|
|     1|    110|     4|964982176|     NULL|
|     1|    151|     5|964984041|     NULL|
|     1|    157|     5|964984100|     NULL|
|     1|    163|     5|964983650|     NULL|
|     1|    216|     5|964981208|     NULL|
|     1|    223|     3|964980985|     NULL|
|     1|    231|     5|964981179|     NULL|
|     1|    235|     4|964980908|     NULL|
|     1|    260|     5|964981680|     NULL|
|     1|    296|     3|964982967|     NULL|
|     1|    316|     3|964982310|     NULL|
|     1|    333|     5|964981179|     NULL|
|     1|    349|     4|964982563

# Sorting & limiting

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
'''
    Show the top 20 highest ratings
    Show the 10 lowest ratings made by user 600
    Sort by rating desc and timestamp asc '''
top20ratings=ratings.orderBy(ratings.rating.desc()).limit(20).show()

+------+-------+------+---------+---------+
|userId|movieId|rating|timestamp|fake_null|
+------+-------+------+---------+---------+
|     1|    940|     5|964982176|     NULL|
|     1|   1210|     5|964980499|     NULL|
|     1|    954|     5|964983219|     NULL|
|     1|     50|     5|964982931|     NULL|
|     1|   1023|     5|964982681|     NULL|
|     1|    151|     5|964984041|     NULL|
|     1|   1024|     5|964982876|     NULL|
|     1|    163|     5|964983650|     NULL|
|     1|   1025|     5|964982791|     NULL|
|     1|    231|     5|964981179|     NULL|
|     1|   1029|     5|964982855|     NULL|
|     1|    333|     5|964981179|     NULL|
|     1|   1031|     5|964982653|     NULL|
|     1|    362|     5|964982588|     NULL|
|     1|   1032|     5|964982791|     NULL|
|     1|    527|     5|964984002|     NULL|
|     1|   1049|     5|964982400|     NULL|
|     1|    596|     5|964982838|     NULL|
|     1|    919|     5|964982475|     NULL|
|     1|     47|     5|964983815

In [None]:
# Show the 10 lowest ratings made by user 600
ratings.filter(ratings.userId==600).orderBy(ratings.rating.asc()).limit(10).show() # CES 10 DERNIERS RATINGS SONT TOUS NULLS

+------+-------+------+----------+---------+
|userId|movieId|rating| timestamp|fake_null|
+------+-------+------+----------+---------+
|   600|    762|     0|1237707803|     NULL|
|   600|   8644|     0|1237761053|     NULL|
|   600|   1186|     0|1237742590|     NULL|
|   600|    342|     0|1237713564|     NULL|
|   600|   1676|     0|1237850849|     NULL|
|   600|   5879|     0|1237709164|     NULL|
|   600|   5945|     0|1237709051|     NULL|
|   600|    317|     0|1237759435|     NULL|
|   600|   7255|     0|1237761339|     NULL|
|   600|   4744|     0|1237714406|     NULL|
+------+-------+------+----------+---------+



In [None]:
# Sort by rating desc and timestamp asc
ratings.orderBy(ratings.rating.desc(),ratings.timestamp.asc()).show()

+------+-------+------+---------+---------+
|userId|movieId|rating|timestamp|fake_null|
+------+-------+------+---------+---------+
|   429|    150|     5|828124615|     NULL|
|   429|    161|     5|828124615|     NULL|
|   429|    588|     5|828124615|     NULL|
|   429|    590|     5|828124615|     NULL|
|   429|    592|     5|828124615|     NULL|
|   429|    595|     5|828124615|     NULL|
|   429|    168|     5|828124616|     NULL|
|   429|    185|     5|828124616|     NULL|
|   429|    207|     5|828124616|     NULL|
|   429|    252|     5|828124616|     NULL|
|   429|    261|     5|828124616|     NULL|
|   429|    270|     5|828124616|     NULL|
|   429|    289|     5|828124616|     NULL|
|   429|    292|     5|828124616|     NULL|
|   429|    317|     5|828124616|     NULL|
|   429|    339|     5|828124616|     NULL|
|   429|    380|     5|828124616|     NULL|
|   107|      2|     5|829322340|     NULL|
|   107|     11|     5|829322340|     NULL|
|   107|     62|     5|829322340

# Derived columns

In [None]:
# rating_x2 = rating * 2
ratings.withColumn("rating_x2",ratings.rating*2).show()
# positive_rating = 1 if rating ≥ 4 else 0
ratings.withColumn("positive_rating",F.when(ratings.rating>=4,1).otherwise(0)).show()
#log_rating = log10(rating + 1)
ratings.withColumn("log_rating",F.log10(ratings.rating+1)).show()


+------+-------+------+---------+---------+---------+
|userId|movieId|rating|timestamp|fake_null|rating_x2|
+------+-------+------+---------+---------+---------+
|     1|      1|     4|964982703|     NULL|        8|
|     1|      3|     4|964981247|     NULL|        8|
|     1|      6|     4|964982224|     NULL|        8|
|     1|     47|     5|964983815|     NULL|       10|
|     1|     50|     5|964982931|     NULL|       10|
|     1|     70|     3|964982400|     NULL|        6|
|     1|    101|     5|964980868|     NULL|       10|
|     1|    110|     4|964982176|     NULL|        8|
|     1|    151|     5|964984041|     NULL|       10|
|     1|    157|     5|964984100|     NULL|       10|
|     1|    163|     5|964983650|     NULL|       10|
|     1|    216|     5|964981208|     NULL|       10|
|     1|    223|     3|964980985|     NULL|        6|
|     1|    231|     5|964981179|     NULL|       10|
|     1|    235|     4|964980908|     NULL|        8|
|     1|    260|     5|96498

# Missing values

In [None]:
from types import NoneType
# Even if ml-latest-small has no nulls, Add a fake null column
ratings = ratings.withColumn("fake_null",F.lit(None))
ratings.show()
# Demonstrate fill/replace/drop operations
ratings.na.fill(0).show()
ratings.na.drop().show()

+------+-------+------+---------+---------+
|userId|movieId|rating|timestamp|fake_null|
+------+-------+------+---------+---------+
|     1|      1|     4|964982703|     NULL|
|     1|      3|     4|964981247|     NULL|
|     1|      6|     4|964982224|     NULL|
|     1|     47|     5|964983815|     NULL|
|     1|     50|     5|964982931|     NULL|
|     1|     70|     3|964982400|     NULL|
|     1|    101|     5|964980868|     NULL|
|     1|    110|     4|964982176|     NULL|
|     1|    151|     5|964984041|     NULL|
|     1|    157|     5|964984100|     NULL|
|     1|    163|     5|964983650|     NULL|
|     1|    216|     5|964981208|     NULL|
|     1|    223|     3|964980985|     NULL|
|     1|    231|     5|964981179|     NULL|
|     1|    235|     4|964980908|     NULL|
|     1|    260|     5|964981680|     NULL|
|     1|    296|     3|964982967|     NULL|
|     1|    316|     3|964982310|     NULL|
|     1|    333|     5|964981179|     NULL|
|     1|    349|     4|964982563

# Distinct & deduplication

In [None]:
# Count unique users
ratings.select("userId").distinct().count()

610

In [None]:
# Count unique movies rated
ratings.select("movieId").distinct().count()

9724

In [None]:
# Drop duplicate ratings where (userId, movieId) might repeat (even though dataset is clean)
ratings.dropDuplicates(["userId","movieId"]).show()

+------+-------+------+----------+---------+
|userId|movieId|rating| timestamp|fake_null|
+------+-------+------+----------+---------+
|     1|   1208|     4| 964983250|     NULL|
|     1|   1348|     4| 964983393|     NULL|
|     4|    457|     5| 945079259|     NULL|
|     4|    599|     2| 945078587|     NULL|
|     6|    274|     4| 845555151|     NULL|
|     6|    327|     1| 845554062|     NULL|
|     6|    520|     3| 845553844|     NULL|
|     7|   1101|     4|1106635502|     NULL|
|     8|    235|     3| 839464076|     NULL|
|     9|   5481|     5|1044656836|     NULL|
|    10| 109853|     4|1455357656|     NULL|
|    18|   1721|     4|1455748443|     NULL|
|    18|   1892|     4|1458679104|     NULL|
|    18|   3896|     4|1504894830|     NULL|
|    18|   4963|     4|1455209694|     NULL|
|    19|   1259|     3| 965705725|     NULL|
|    19|   2548|     2| 965705383|     NULL|
|    19|   2762|     4| 965710565|     NULL|
|    19|   3704|     2| 965703756|     NULL|
|    21| 1

## Task 2 — Aggregations & GroupBy

**2.1 Simple aggregations**

*	Compute min, max, avg rating
*	Count total number of ratings
*	Count ratings per movieId

**2.2 GroupBy**

Compute:

*	average rating per movie
*	number of ratings per movie
*	average rating per user
*	number of ratings per user

**2.3 Top items**

*	Top 20 most-rated movies
*	Top 20 best-rated movies (min 50 ratings)

→ You must filter with a join or window

**2.4 Window functions**

Using Window partitioned by movieId:

*	rank users by timestamp (earliest → latest rating)
*	create a lag column: previous rating by same user
*	compute average rating per movie with window


In [None]:
# Write your solution here
from pyspark.sql.functions import min, max, avg, count

df = ratings # Use the existing ratings DataFrame

# Min, Max, Avg rating
df.agg(
    min("rating").alias("min_rating"),
    max("rating").alias("max_rating"),
    avg("rating").alias("avg_rating"),
    count("rating").alias("total_ratings")
).show()

# Count ratings per movie
df.groupBy("movieId").count().show()

+----------+----------+-----------------+-------------+
|min_rating|max_rating|       avg_rating|total_ratings|
+----------+----------+-----------------+-------------+
|         0|         5|3.350827085564679|       100836|
+----------+----------+-----------------+-------------+

+-------+-----+
|movieId|count|
+-------+-----+
|   1580|  165|
|   2366|   25|
|   3175|   75|
|   1088|   42|
|  32460|    4|
|  44022|   23|
|  96488|    4|
|   1238|    9|
|   1342|   11|
|   1591|   26|
|   1645|   51|
|   4519|    9|
|   2142|   10|
|    471|   40|
|   3997|   12|
|    833|    6|
|   3918|    9|
|   7982|    4|
|   1959|   15|
|  68135|   10|
+-------+-----+
only showing top 20 rows


In [None]:
from pyspark.sql.functions import avg

df.groupBy("movieId").agg(
    avg("rating").alias("avg_rating"),
    count("rating").alias("num_ratings")
).show()

+-------+------------------+-----------+
|movieId|        avg_rating|num_ratings|
+-------+------------------+-----------+
|   1580|3.3393939393939394|        165|
|   2366|              3.56|         25|
|   3175| 3.466666666666667|         75|
|   1088|3.1666666666666665|         42|
|  32460|               4.0|          4|
|  44022|2.9565217391304346|         23|
|  96488|               4.0|          4|
|   1238|               4.0|          9|
|   1342|2.3636363636363638|         11|
|   1591|               2.5|         26|
|   1645|3.2745098039215685|         51|
|   4519|3.2222222222222223|          9|
|   2142|               2.6|         10|
|    471|              3.45|         40|
|   3997|1.6666666666666667|         12|
|    833|1.8333333333333333|          6|
|   3918|3.2222222222222223|          9|
|   7982|              2.75|          4|
|   1959| 3.533333333333333|         15|
|  68135|               3.2|         10|
+-------+------------------+-----------+
only showing top

In [None]:
df.groupBy("userId").agg(
    avg("rating").alias("avg_rating_user"),
    count("rating").alias("num_ratings_user")
).show()

+------+------------------+----------------+
|userId|   avg_rating_user|num_ratings_user|
+------+------------------+----------------+
|   148|3.6041666666666665|              48|
|   463| 3.606060606060606|              33|
|   471| 3.642857142857143|              28|
|   496|3.2413793103448274|              29|
|   243| 4.138888888888889|              36|
|   392|               3.2|              25|
|   540|3.8095238095238093|              42|
|    31|              3.92|              50|
|   516|3.6923076923076925|              26|
|    85|3.7058823529411766|              34|
|   137| 3.801418439716312|             141|
|   251| 4.826086956521739|              23|
|   451|3.7941176470588234|              34|
|   580|  3.31651376146789|             436|
|    65| 3.764705882352941|              34|
|   458|4.1525423728813555|              59|
|    53|               5.0|              20|
|   255|2.5681818181818183|              44|
|   481| 2.806451612903226|              31|
|   588|  

In [None]:
df.groupBy("movieId").count().orderBy("count", ascending=False).show(20)

+-------+-----+
|movieId|count|
+-------+-----+
|    356|  329|
|    318|  317|
|    296|  307|
|    593|  279|
|   2571|  278|
|    260|  251|
|    480|  238|
|    110|  237|
|    589|  224|
|    527|  220|
|   2959|  218|
|      1|  215|
|   1196|  211|
|     50|  204|
|   2858|  204|
|     47|  203|
|    780|  202|
|    150|  201|
|   1198|  200|
|   4993|  198|
+-------+-----+
only showing top 20 rows


In [None]:
from pyspark.sql.functions import col

top_movies = (
    df.groupBy("movieId")
      .agg(avg("rating").alias("avg_rating"), count("rating").alias("num_ratings"))
      .filter(col("num_ratings") >= 50)
      .orderBy(col("avg_rating").desc())
)

top_movies.show(20)

+-------+------------------+-----------+
|movieId|        avg_rating|num_ratings|
+-------+------------------+-----------+
|    318|4.3280757097791795|        317|
|    858| 4.182291666666667|        192|
|   1221| 4.147286821705427|        129|
|   1276| 4.140350877192983|         57|
|    750| 4.134020618556701|         97|
|    260| 4.131474103585657|        251|
|    912|              4.12|        100|
|    904| 4.119047619047619|         84|
|   1208|  4.11214953271028|        107|
|   2959| 4.105504587155964|        218|
|   1196| 4.104265402843602|        211|
|   1213| 4.103174603174603|        126|
|     50| 4.102941176470588|        204|
|    527|               4.1|        220|
|    296|  4.09771986970684|        307|
|   1198|             4.095|        200|
|   1089| 4.091603053435114|        131|
|   1197| 4.091549295774648|        142|
|    908| 4.087719298245614|         57|
|   1193| 4.075187969924812|        133|
+-------+------------------+-----------+
only showing top

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, lag, mean

# Définir la fenêtre par movieId ordonnée par timestamp
window_movie = Window.partitionBy("movieId").orderBy("timestamp")

# Rank des ratings par film (earliest → latest rating)
df = df.withColumn("rank_by_timestamp", row_number().over(window_movie))

# Lag : note précédente pour le même film
df = df.withColumn("prev_rating", lag("rating").over(window_movie))

# Moyenne par film avec window (facultatif, car groupBy suffit souvent)
df = df.withColumn("avg_rating_movie_window", mean("rating").over(Window.partitionBy("movieId")))

df.show(5)

+------+-------+------+---------+---------+-----------------+-----------+-----------------------+
|userId|movieId|rating|timestamp|fake_null|rank_by_timestamp|prev_rating|avg_rating_movie_window|
+------+-------+------+---------+---------+-----------------+-----------+-----------------------+
|   107|      1|     4|829322340|     NULL|                1|       NULL|      3.813953488372093|
|   191|      1|     4|829759809|     NULL|                2|          4|      3.813953488372093|
|    54|      1|     3|830247330|     NULL|                3|          4|      3.813953488372093|
|   468|      1|     4|831400444|     NULL|                4|          3|      3.813953488372093|
|   353|      1|     5|831939685|     NULL|                5|          4|      3.813953488372093|
+------+-------+------+---------+---------+-----------------+-----------+-----------------------+
only showing top 5 rows


## Task 3 — Spark SQL

In [None]:
ratings.createOrReplaceTempView("ratings_table")
movies.createOrReplaceTempView("movies_table")

**3.1 Simple SQL Queries**

*	Show first 20 rows
*	Count total ratings
*	Count distinct users
*	Average rating overall

**3.2 SQL Grouping**
*	Average rating by movie
*	Ratings count by movie
*	Best movies with at least 100 ratings
*	Worst movies with at least 100 ratings
*	Most active users

**3.3 SQL CASE WHEN**

Create rating buckets:

*	≥ 4.0 → “high”
*	3.0–3.9 → “medium”
*	< 3.0 → “low”

**3.4 SQL Window Functions**

*	Top 10 movies by rating for each genre
*	Earliest rating per user (row_number)
*	Rolling average rating per movie

In [None]:
print("First 20 rows from ratings_table:")
spark.sql("SELECT * FROM ratings_table LIMIT 20").show()

print("\nTotal number of ratings:")
spark.sql("SELECT COUNT(*) FROM ratings_table").show()

print("\nNumber of distinct users:")
spark.sql("SELECT COUNT(DISTINCT userId) FROM ratings_table").show()

print("\nOverall average rating:")
spark.sql("SELECT AVG(rating) FROM ratings_table").show()

First 20 rows from ratings_table:
+------+-------+------+---------+---------+
|userId|movieId|rating|timestamp|fake_null|
+------+-------+------+---------+---------+
|     1|      1|     4|964982703|     NULL|
|     1|      3|     4|964981247|     NULL|
|     1|      6|     4|964982224|     NULL|
|     1|     47|     5|964983815|     NULL|
|     1|     50|     5|964982931|     NULL|
|     1|     70|     3|964982400|     NULL|
|     1|    101|     5|964980868|     NULL|
|     1|    110|     4|964982176|     NULL|
|     1|    151|     5|964984041|     NULL|
|     1|    157|     5|964984100|     NULL|
|     1|    163|     5|964983650|     NULL|
|     1|    216|     5|964981208|     NULL|
|     1|    223|     3|964980985|     NULL|
|     1|    231|     5|964981179|     NULL|
|     1|    235|     4|964980908|     NULL|
|     1|    260|     5|964981680|     NULL|
|     1|    296|     3|964982967|     NULL|
|     1|    316|     3|964982310|     NULL|
|     1|    333|     5|964981179|     NULL

In [None]:
print("Top 20 best movies (with at least 100 ratings):")
spark.sql("""
    SELECT m.title, AVG(r.rating) AS avg_rating, COUNT(r.rating) AS num_ratings
    FROM ratings_table r
    JOIN movies_table m ON r.movieId = m.movieId
    GROUP BY m.movieId, m.title
    HAVING COUNT(r.rating) >= 100
    ORDER BY avg_rating DESC
    LIMIT 20
""").show()

Top 20 best movies (with at least 100 ratings):
+--------------------+------------------+-----------+
|               title|        avg_rating|num_ratings|
+--------------------+------------------+-----------+
|Shawshank Redempt...|4.3280757097791795|        317|
|Godfather, The (1...| 4.182291666666667|        192|
|Godfather: Part I...| 4.147286821705427|        129|
|Star Wars: Episod...| 4.131474103585657|        251|
|   Casablanca (1942)|              4.12|        100|
|Apocalypse Now (1...|  4.11214953271028|        107|
|   Fight Club (1999)| 4.105504587155964|        218|
|Star Wars: Episod...| 4.104265402843602|        211|
|   Goodfellas (1990)| 4.103174603174603|        126|
|Usual Suspects, T...| 4.102941176470588|        204|
|Schindler's List ...|               4.1|        220|
| Pulp Fiction (1994)|  4.09771986970684|        307|
|Raiders of the Lo...|             4.095|        200|
|Reservoir Dogs (1...| 4.091603053435114|        131|
|Princess Bride, T...| 4.091549295

In [None]:
print("Top 20 worst movies (with at least 100 ratings):")
spark.sql("""
    SELECT m.title, AVG(r.rating) AS avg_rating, COUNT(r.rating) AS num_ratings
    FROM ratings_table r
    JOIN movies_table m ON r.movieId = m.movieId
    GROUP BY m.movieId, m.title
    HAVING COUNT(r.rating) >= 100
    ORDER BY avg_rating ASC
    LIMIT 20
""").show()

Top 20 worst movies (with at least 100 ratings):
+--------------------+------------------+-----------+
|               title|        avg_rating|num_ratings|
+--------------------+------------------+-----------+
|   Waterworld (1995)|2.8260869565217392|        115|
|Batman Forever (1...| 2.854014598540146|        137|
|   Home Alone (1990)|2.9051724137931036|        116|
|Ace Ventura: Pet ...|2.9316770186335406|        161|
|Dumb & Dumber (Du...|2.9323308270676693|        133|
|Star Wars: Episod...|2.9714285714285715|        140|
|     Net, The (1995)|2.9732142857142856|        112|
|  Cliffhanger (1993)|  2.99009900990099|        101|
|Austin Powers: Th...|3.0330578512396693|        121|
|    Mask, The (1994)|3.0955414012738856|        157|
| American Pie (1999)| 3.203883495145631|        103|
|     Clueless (1995)|3.2211538461538463|        104|
|      Twister (1996)|3.2439024390243905|        123|
|Mrs. Doubtfire (1...|3.2708333333333335|        144|
|      Titanic (1997)|3.271428571

## Task 4 — Joins & Genre Analytics

**4.1 Basic join**

Join ratings → movies on movieId.

*	Show 20 joined rows
*	Show userId, title, rating
*	Count how many ratings each genre has

**4.2 Parse genres**

genres looks like "Action|Adventure|Sci-Fi"

*	split genres into an array
*	explode the genres
*	count ratings per genre
*	compute average rating per genre

**4.3 Join + Aggregation**

Compute:

*	average rating per genre
*	average rating per movie title
*	number of ratings per movie
*	number of ratings per genre per year (bonus: extract year from title)

**4.4 Left Anti Join**

Find:

*	movies in movies.csv with no ratings
*	number of such movies

In [None]:
exploded_genres_df.createOrReplaceTempView("exploded_genres_table")
print("Temporary view 'exploded_genres_table' created.")

Temporary view 'exploded_genres_table' created.


In [None]:
print("Ratings with category buckets:")
spark.sql("""
    SELECT
        *,
        CASE
            WHEN rating >= 4.0 THEN 'high'
            WHEN rating >= 3.0 AND rating < 4.0 THEN 'medium'
            ELSE 'low'
        END AS rating_bucket
    FROM ratings_table
    LIMIT 20
""").show()

Ratings with category buckets:
+------+-------+------+---------+---------+-------------+
|userId|movieId|rating|timestamp|fake_null|rating_bucket|
+------+-------+------+---------+---------+-------------+
|     1|      1|     4|964982703|     NULL|         high|
|     1|      3|     4|964981247|     NULL|         high|
|     1|      6|     4|964982224|     NULL|         high|
|     1|     47|     5|964983815|     NULL|         high|
|     1|     50|     5|964982931|     NULL|         high|
|     1|     70|     3|964982400|     NULL|       medium|
|     1|    101|     5|964980868|     NULL|         high|
|     1|    110|     4|964982176|     NULL|         high|
|     1|    151|     5|964984041|     NULL|         high|
|     1|    157|     5|964984100|     NULL|         high|
|     1|    163|     5|964983650|     NULL|         high|
|     1|    216|     5|964981208|     NULL|         high|
|     1|    223|     3|964980985|     NULL|       medium|
|     1|    231|     5|964981179|     NUL

In [None]:
print("Top 10 movies by rating within each genre:")
spark.sql("""
    WITH MovieGenreAvgRating AS (
        SELECT
            genre,
            title,
            movieId,
            AVG(rating) AS avg_rating
        FROM exploded_genres_table
        GROUP BY genre, title, movieId
    ),
    RankedMovies AS (
        SELECT
            genre,
            title,
            avg_rating,
            ROW_NUMBER() OVER (PARTITION BY genre ORDER BY avg_rating DESC) as rn
        FROM MovieGenreAvgRating
    )
    SELECT
        genre,
        title,
        avg_rating
    FROM RankedMovies
    WHERE rn <= 10
    ORDER BY genre, rn
""").show(truncate=False)

Top 10 movies by rating within each genre:
+------------------+--------------------------------------------------------------------+----------+
|genre             |title                                                               |avg_rating|
+------------------+--------------------------------------------------------------------+----------+
|(no genres listed)|The Adventures of Sherlock Holmes and Doctor Watson                 |5.0       |
|(no genres listed)|Black Mirror                                                        |5.0       |
|(no genres listed)|Death Note: Desu nôto (2006–2007)                                   |5.0       |
|(no genres listed)|Whiplash (2013)                                                     |4.5       |
|(no genres listed)|The Godfather Trilogy: 1972-1990 (1992)                             |4.5       |
|(no genres listed)|Cosmos                                                              |4.5       |
|(no genres listed)|Lemonade (2016)             

In [None]:
print("Earliest rating made by each user:")
spark.sql("""
    WITH UserRankedRatings AS (
        SELECT
            userId,
            movieId,
            rating,
            timestamp,
            ROW_NUMBER() OVER (PARTITION BY userId ORDER BY timestamp ASC) as rn
        FROM ratings_table
    )
    SELECT
        userId,
        movieId,
        rating,
        timestamp
    FROM UserRankedRatings
    WHERE rn = 1
    ORDER BY userId
""").show(truncate=False)

Earliest rating made by each user:
+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|1     |804    |4     |964980499 |
|2     |318    |3     |1445714835|
|3     |1275   |3     |1306463323|
|4     |171    |3     |945078428 |
|5     |590    |5     |847434747 |
|6     |590    |5     |845553109 |
|7     |1784   |0     |1106635416|
|8     |150    |4     |839463422 |
|9     |41     |3     |1044656650|
|10    |92259  |5     |1455301553|
|11    |1917   |4     |901200037 |
|12    |543    |3     |1247263318|
|13    |1639   |4     |987456818 |
|14    |592    |2     |835440976 |
|15    |172    |1     |1299424762|
|16    |2427   |1     |1377476573|
|17    |910    |3     |1305696226|
|18    |318    |5     |1455049328|
|19    |1882   |2     |965701107 |
|20    |2804   |5     |1054036159|
+------+-------+------+----------+
only showing top 20 rows


In [None]:
print("Rolling average rating for each movie:")
spark.sql("""
    SELECT
        movieId,
        timestamp,
        rating,
        AVG(rating) OVER (PARTITION BY movieId ORDER BY timestamp ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rolling_avg_rating
    FROM ratings_table
    ORDER BY movieId, timestamp
""").show(20, truncate=False)

Rolling average rating for each movie:
+-------+---------+------+------------------+
|movieId|timestamp|rating|rolling_avg_rating|
+-------+---------+------+------------------+
|1      |829322340|4     |4.0               |
|1      |829759809|4     |4.0               |
|1      |830247330|3     |3.6666666666666665|
|1      |831400444|4     |3.75              |
|1      |831939685|5     |4.0               |
|1      |832058959|5     |4.166666666666667 |
|1      |832079851|3     |4.0               |
|1      |832105242|5     |4.125             |
|1      |832589610|3     |4.0               |
|1      |832841103|3     |3.9               |
|1      |833529571|4     |3.909090909090909 |
|1      |834398280|3     |3.8333333333333335|
|1      |834691642|4     |3.8461538461538463|
|1      |834787906|5     |3.9285714285714284|
|1      |834987643|5     |4.0               |
|1      |835021447|4     |4.0               |
|1      |835532155|5     |4.0588235294117645|
|1      |835643027|3     |4.0            

In [None]:
print("Ratings with category buckets:")
spark.sql("""
    SELECT
        *,
        CASE
            WHEN rating >= 4.0 THEN 'high'
            WHEN rating >= 3.0 AND rating < 4.0 THEN 'medium'
            ELSE 'low'
        END AS rating_bucket
    FROM ratings_table
    LIMIT 20
""").show()

Ratings with category buckets:
+------+-------+------+---------+---------+-------------+
|userId|movieId|rating|timestamp|fake_null|rating_bucket|
+------+-------+------+---------+---------+-------------+
|     1|      1|     4|964982703|     NULL|         high|
|     1|      3|     4|964981247|     NULL|         high|
|     1|      6|     4|964982224|     NULL|         high|
|     1|     47|     5|964983815|     NULL|         high|
|     1|     50|     5|964982931|     NULL|         high|
|     1|     70|     3|964982400|     NULL|       medium|
|     1|    101|     5|964980868|     NULL|         high|
|     1|    110|     4|964982176|     NULL|         high|
|     1|    151|     5|964984041|     NULL|         high|
|     1|    157|     5|964984100|     NULL|         high|
|     1|    163|     5|964983650|     NULL|         high|
|     1|    216|     5|964981208|     NULL|         high|
|     1|    223|     3|964980985|     NULL|       medium|
|     1|    231|     5|964981179|     NUL

In [None]:
print("First 20 joined rows (userId, title, rating):")
spark.sql("""
    SELECT
        r.userId,
        m.title,
        r.rating
    FROM ratings_table r
    INNER JOIN movies_table m ON r.movieId = m.movieId
    LIMIT 20
""").show(truncate=False)

First 20 joined rows (userId, title, rating):
+------+-----------------------------------------+------+
|userId|title                                    |rating|
+------+-----------------------------------------+------+
|1     |Toy Story (1995)                         |4     |
|1     |Grumpier Old Men (1995)                  |4     |
|1     |Heat (1995)                              |4     |
|1     |Seven (a.k.a. Se7en) (1995)              |5     |
|1     |Usual Suspects, The (1995)               |5     |
|1     |From Dusk Till Dawn (1996)               |3     |
|1     |Bottle Rocket (1996)                     |5     |
|1     |Braveheart (1995)                        |4     |
|1     |Rob Roy (1995)                           |5     |
|1     |Canadian Bacon (1995)                    |5     |
|1     |Desperado (1995)                         |5     |
|1     |Billy Madison (1995)                     |5     |
|1     |Clerks (1994)                            |3     |
|1     |Dumb & Dumber (Dum

In [None]:
print("Ratings count by genre:")
spark.sql("""
    SELECT
        m.genres,
        COUNT(r.rating) AS rating_count
    FROM ratings_table r
    INNER JOIN movies_table m ON r.movieId = m.movieId
    GROUP BY m.genres
    ORDER BY rating_count DESC
""").show(truncate=False)

Ratings count by genre:
+--------------------------------+------------+
|genres                          |rating_count|
+--------------------------------+------------+
|Comedy                          |7196        |
|Drama                           |6291        |
|Comedy|Romance                  |3967        |
|Comedy|Drama|Romance            |3000        |
|Comedy|Drama                    |2851        |
|Drama|Romance                   |2838        |
|Action|Adventure|Sci-Fi         |2361        |
|Crime|Drama                     |2315        |
|Action|Crime|Thriller           |1554        |
|Action|Adventure|Thriller       |1455        |
|Action|Adventure|Sci-Fi|Thriller|1446        |
|Drama|Thriller                  |1365        |
|Action|Sci-Fi|Thriller          |1195        |
|Comedy|Crime                    |1171        |
|Crime|Drama|Thriller            |1119        |
|Drama|War                       |1044        |
|Action|Drama|War                |1034        |
|Action|Crime|Dr

In [None]:
genre_stats_df = exploded_genres_df.groupBy("genre").agg(
    count("rating").alias("rating_count"),
    avg("rating").alias("average_rating")
)

print("Ratings count and average rating per genre:")
genre_stats_df.orderBy(col("rating_count").desc()).show(truncate=False)

Ratings count and average rating per genre:
+------------------+------------+------------------+
|genre             |rating_count|average_rating    |
+------------------+------------+------------------+
|Drama             |41928       |3.5066065636328947|
|Comedy            |39053       |3.231889995646941 |
|Action            |30635       |3.294173331157173 |
|Thriller          |26452       |3.3439815514894904|
|Adventure         |24161       |3.355779975994371 |
|Romance           |18124       |3.3595232840432576|
|Sci-Fi            |17243       |3.2978020066113785|
|Crime             |16681       |3.5098015706492416|
|Fantasy           |11834       |3.331079939158357 |
|Children          |9208        |3.2656385751520416|
|Mystery           |7674        |3.4744592129267655|
|Horror            |7291        |3.101769304622137 |
|Animation         |6988        |3.467229536348025 |
|War               |4859        |3.6703025313850586|
|IMAX              |4145        |3.4277442702050664|
|M

## Task 5 — MLlib Classification

**Goal: Build a binary classifier:**

Predict whether a user will give rating ≥ 4.0

5.1 Create label

Add a column:

In [None]:
from pyspark.sql.functions import when

# Add a 'label' column: 1 if rating >= 4.0, else 0
ratings = ratings.withColumn("label", when(ratings["rating"] >= 4.0, 1).otherwise(0))

# Display the first few rows with the new 'label' column to verify
ratings.show(5)

+------+-------+------+---------+---------+-----+
|userId|movieId|rating|timestamp|fake_null|label|
+------+-------+------+---------+---------+-----+
|     1|      1|     4|964982703|     NULL|    1|
|     1|      3|     4|964981247|     NULL|    1|
|     1|      6|     4|964982224|     NULL|    1|
|     1|     47|     5|964983815|     NULL|    1|
|     1|     50|     5|964982931|     NULL|    1|
+------+-------+------+---------+---------+-----+
only showing top 5 rows


**5.2 Feature engineering**

Create features:

	•	rating (as-is)
	•	timestamp
	•	normalized timestamp
	•	(optional) number of ratings by that user (using join or window)

Use VectorAssembler to pack them.

**5.3 Split data**

70% train, 30% test.

**5.4 Train model**

Train a Logistic Regression model.

	•	print coefficients
	•	print intercept
	•	print ROC AUC

**5.5 Evaluate**

Compute :

	•	accuracy
	•	precision
	•	recall
	•	confusion matrix (TP, FP, TN, FN)

**5.6 Task: Improve model**

try:

	•	Adding a log-transformed timestamp
	•	Using a DecisionTreeClassifier
	•	Using a RandomForestClassifier
	•	Comparing metrics

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import min, max, col

# Calculate min and max timestamp for normalization
min_max_timestamp = ratings.agg(min("timestamp").alias("min_timestamp"), max("timestamp").alias("max_timestamp")).collect()[0]
min_timestamp = min_max_timestamp["min_timestamp"]
max_timestamp = min_max_timestamp["max_timestamp"]

print(f"Minimum timestamp: {min_timestamp}")
print(f"Maximum timestamp: {max_timestamp}")

Minimum timestamp: 828124615
Maximum timestamp: 1537799250


In [None]:
ratings = ratings.withColumn(
    "normalized_timestamp",
    (col("timestamp") - min_timestamp) / (max_timestamp - min_timestamp)
)

ratings.show(5)

+------+-------+------+---------+---------+-----+--------------------+----------------+--------------------+------------------+
|userId|movieId|rating|timestamp|fake_null|label|normalized_timestamp|num_user_ratings|            features|     log_timestamp|
+------+-------+------+---------+---------+-----+--------------------+----------------+--------------------+------------------+
|     1|      1|     4|964982703|     NULL|    1| 0.19284624425107147|             232|[4.0,0.1928462442...|20.687620734790286|
|     1|      3|     4|964981247|     NULL|    1| 0.19284419260665842|             232|[4.0,0.1928441926...|20.687619225953814|
|     1|      6|     4|964982224|     NULL|    1|  0.1928455692938779|             232|[4.0,0.1928455692...|20.687620238408208|
|     1|     47|     5|964983815|     NULL|    1| 0.19284781116631003|             232|[5.0,0.1928478111...|20.687621887141884|
|     1|     50|     5|964982931|     NULL|    1| 0.19284656552505924|             232|[5.0,0.1928465655

In [None]:
num_ratings_per_user_df = ratings.groupBy("userId").agg(count("rating").alias("num_user_ratings"))

# Drop the ambiguous 'num_user_ratings' column if it exists before joining
if "num_user_ratings" in ratings.columns:
    ratings = ratings.drop("num_user_ratings")

ratings = ratings.join(num_ratings_per_user_df, on="userId", how="left")

ratings.show(5)

+------+-------+------+---------+---------+-----+--------------------+--------------------+------------------+----------------+
|userId|movieId|rating|timestamp|fake_null|label|normalized_timestamp|            features|     log_timestamp|num_user_ratings|
+------+-------+------+---------+---------+-----+--------------------+--------------------+------------------+----------------+
|     1|      1|     4|964982703|     NULL|    1| 0.19284624425107147|[4.0,0.1928462442...|20.687620734790286|             232|
|     1|      3|     4|964981247|     NULL|    1| 0.19284419260665842|[4.0,0.1928441926...|20.687619225953814|             232|
|     1|      6|     4|964982224|     NULL|    1|  0.1928455692938779|[4.0,0.1928455692...|20.687620238408208|             232|
|     1|     47|     5|964983815|     NULL|    1| 0.19284781116631003|[5.0,0.1928478111...|20.687621887141884|             232|
|     1|     50|     5|964982931|     NULL|    1| 0.19284656552505924|[5.0,0.1928465655...| 20.687620971

In [None]:
assembler = VectorAssembler(
    inputCols=["rating", "normalized_timestamp", "num_user_ratings"],
    outputCol="features"
)

# Drop the 'features' column if it already exists to avoid IllegalArgumentException
if "features" in ratings.columns:
    ratings = ratings.drop("features")

ratings = assembler.transform(ratings)

ratings.select("userId", "rating", "normalized_timestamp", "num_user_ratings", "features", "label").show(5, truncate=False)

+------+------+--------------------+----------------+-------------------------------+-----+
|userId|rating|normalized_timestamp|num_user_ratings|features                       |label|
+------+------+--------------------+----------------+-------------------------------+-----+
|1     |4     |0.19284624425107147 |232             |[4.0,0.19284624425107147,232.0]|1    |
|1     |4     |0.19284419260665842 |232             |[4.0,0.19284419260665842,232.0]|1    |
|1     |4     |0.1928455692938779  |232             |[4.0,0.1928455692938779,232.0] |1    |
|1     |5     |0.19284781116631003 |232             |[5.0,0.19284781116631003,232.0]|1    |
|1     |5     |0.19284656552505924 |232             |[5.0,0.19284656552505924,232.0]|1    |
+------+------+--------------------+----------------+-------------------------------+-----+
only showing top 5 rows


In [None]:
train_df, test_df = ratings.randomSplit([0.7, 0.3], seed=42)

print(f"Training DataFrame count: {train_df.count()}")
print(f"Test DataFrame count: {test_df.count()}")

# Display the first few rows of the training DataFrame to verify
train_df.show(5, truncate=False)

Training DataFrame count: 70549
Test DataFrame count: 30287
+------+-------+------+---------+---------+-----+--------------------+------------------+----------------+-------------------------------+
|userId|movieId|rating|timestamp|fake_null|label|normalized_timestamp|log_timestamp     |num_user_ratings|features                       |
+------+-------+------+---------+---------+-----+--------------------+------------------+----------------+-------------------------------+
|1     |1      |4     |964982703|NULL     |1    |0.19284624425107147 |20.687620734790286|232             |[4.0,0.19284624425107147,232.0]|
|1     |3      |4     |964981247|NULL     |1    |0.19284419260665842 |20.687619225953814|232             |[4.0,0.19284419260665842,232.0]|
|1     |47     |5     |964983815|NULL     |1    |0.19284781116631003 |20.687621887141884|232             |[5.0,0.19284781116631003,232.0]|
|1     |50     |5     |964982931|NULL     |1    |0.19284656552505924 |20.68762097106392 |232             |

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# 2. Instantiate a LogisticRegression model
lr = LogisticRegression(featuresCol='features', labelCol='label', maxIter=10)

# 3. Train the Logistic Regression model
lr_model = lr.fit(train_df)

# 4. Print the coefficients and intercept
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

# 5. Make predictions on the test_df
predictions = lr_model.transform(test_df)

# 6. Evaluate the model's ROC AUC score
b_evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
roc_auc = b_evaluator.evaluate(predictions)
print(f"\nROC AUC on test data: {roc_auc}")

# 7. Evaluate the model's accuracy, precision, and recall
accuracy_evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(predictions)
print(f"Accuracy on test data: {accuracy}")

precision_evaluator = MulticlassClassificationEvaluator(metricName="weightedPrecision")
precision = precision_evaluator.evaluate(predictions)
print(f"Precision on test data: {precision}")

recall_evaluator = MulticlassClassificationEvaluator(metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions)
print(f"Recall on test data: {recall}")

# 8. Compute the confusion matrix
tp = predictions.filter((predictions.label == 1) & (predictions.prediction == 1)).count()
fp = predictions.filter((predictions.label == 0) & (predictions.prediction == 1)).count()
tn = predictions.filter((predictions.label == 0) & (predictions.prediction == 0)).count()
fn = predictions.filter((predictions.label == 1) & (predictions.prediction == 0)).count()

print(f"\nConfusion Matrix:")
print(f"True Positives (TP): {tp}")
print(f"False Positives (FP): {fp}")
print(f"True Negatives (TN): {tn}")
print(f"False Negatives (FN): {fn}")

Coefficients: [13.50448020975164,-0.6891060239814604,1.4458381208706308e-05]
Intercept: -46.70677595589607

ROC AUC on test data: 0.9999995085738272
Accuracy on test data: 1.0
Precision on test data: 1.0
Recall on test data: 1.0

Confusion Matrix:
True Positives (TP): 14511
False Positives (FP): 0
True Negatives (TN): 15776
False Negatives (FN): 0


In [None]:
print("Schema of the ratings DataFrame after adding log_timestamp:")
ratings.printSchema()

print("\nFirst 5 rows of the ratings DataFrame with log_timestamp:")
ratings.show(5, truncate=False)

Schema of the ratings DataFrame after adding log_timestamp:
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- fake_null: void (nullable = true)
 |-- label: integer (nullable = false)
 |-- normalized_timestamp: double (nullable = true)
 |-- log_timestamp: double (nullable = true)
 |-- num_user_ratings: long (nullable = true)
 |-- features: vector (nullable = true)


First 5 rows of the ratings DataFrame with log_timestamp:
+------+-------+------+---------+---------+-----+--------------------+------------------+----------------+-------------------------------+
|userId|movieId|rating|timestamp|fake_null|label|normalized_timestamp|log_timestamp     |num_user_ratings|features                       |
+------+-------+------+---------+---------+-----+--------------------+------------------+----------------+-------------------------------+
|1     |1      |4     |964982703|NULL

In [None]:
from pyspark.sql.functions import log, col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# 1. Add a new column named `log_timestamp` to the `ratings` DataFrame
ratings_with_log_timestamp = ratings.withColumn("log_timestamp", log(col("timestamp")))

# 2. Create a new VectorAssembler instance with the enhanced feature set
new_assembler = VectorAssembler(
    inputCols=["rating", "normalized_timestamp", "num_user_ratings", "log_timestamp"],
    outputCol="features"
)

# 3. Transform the `ratings` DataFrame to update the 'features' column
#    Drop the existing 'features' column before transforming to avoid IllegalArgumentException
ratings_new_features = new_assembler.transform(ratings_with_log_timestamp.drop("features"))

# Display the first few rows with the new log_timestamp and updated features column to verify
print("DataFrame with new log_timestamp and updated features:")
ratings_new_features.select("userId", "rating", "timestamp", "log_timestamp", "normalized_timestamp", "num_user_ratings", "features", "label").show(5, truncate=False)

# 4. Split the updated `ratings` DataFrame into new training and testing sets
train_df_new_features, test_df_new_features = ratings_new_features.randomSplit([0.7, 0.3], seed=42)

print(f"\nTraining DataFrame (new features) count: {train_df_new_features.count()}")
print(f"Test DataFrame (new features) count: {test_df_new_features.count()}")

# --- DecisionTreeClassifier ---
print("\n--- DecisionTreeClassifier ---")
# 6. Instantiate a DecisionTreeClassifier model
dt = DecisionTreeClassifier(featuresCol='features', labelCol='label', seed=42)

# 7. Train the DecisionTreeClassifier model
dt_model = dt.fit(train_df_new_features)

# 8. Make predictions on the test_df_new_features DataFrame
dt_predictions = dt_model.transform(test_df_new_features)

# 9. Evaluate the Decision Tree model's ROC AUC score, accuracy, precision, and recall
b_evaluator_dt = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
dt_roc_auc = b_evaluator_dt.evaluate(dt_predictions)
print(f"ROC AUC (Decision Tree) on test data: {dt_roc_auc}")

accuracy_evaluator_dt = MulticlassClassificationEvaluator(metricName="accuracy")
dt_accuracy = accuracy_evaluator_dt.evaluate(dt_predictions)
print(f"Accuracy (Decision Tree) on test data: {dt_accuracy}")

precision_evaluator_dt = MulticlassClassificationEvaluator(metricName="weightedPrecision")
dt_precision = precision_evaluator_dt.evaluate(dt_predictions)
print(f"Precision (Decision Tree) on test data: {dt_precision}")

recall_evaluator_dt = MulticlassClassificationEvaluator(metricName="weightedRecall")
dt_recall = recall_evaluator_dt.evaluate(dt_predictions)
print(f"Recall (Decision Tree) on test data: {dt_recall}")

# 10. Compute and print the confusion matrix for the Decision Tree model
tp_dt = dt_predictions.filter((dt_predictions.label == 1) & (dt_predictions.prediction == 1)).count()
fp_dt = dt_predictions.filter((dt_predictions.label == 0) & (dt_predictions.prediction == 1)).count()
tn_dt = dt_predictions.filter((dt_predictions.label == 0) & (dt_predictions.prediction == 0)).count()
fn_dt = dt_predictions.filter((dt_predictions.label == 1) & (dt_predictions.prediction == 0)).count()

print(f"\nConfusion Matrix (Decision Tree):")
print(f"True Positives (TP): {tp_dt}")
print(f"False Positives (FP): {fp_dt}")
print(f"True Negatives (TN): {tn_dt}")
print(f"False Negatives (FN): {fn_dt}")

# --- RandomForestClassifier ---
print("\n--- RandomForestClassifier ---")
# 12. Instantiate a RandomForestClassifier model
rf = RandomForestClassifier(featuresCol='features', labelCol='label', seed=42)

# 13. Train the RandomForestClassifier model
rf_model = rf.fit(train_df_new_features)

# 14. Make predictions on the test_df_new_features DataFrame
rf_predictions = rf_model.transform(test_df_new_features)

# 15. Evaluate the Random Forest model's ROC AUC score, accuracy, precision, and recall
b_evaluator_rf = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
rf_roc_auc = b_evaluator_rf.evaluate(rf_predictions)
print(f"ROC AUC (Random Forest) on test data: {rf_roc_auc}")

accuracy_evaluator_rf = MulticlassClassificationEvaluator(metricName="accuracy")
rf_accuracy = accuracy_evaluator_rf.evaluate(rf_predictions)
print(f"Accuracy (Random Forest) on test data: {rf_accuracy}")

precision_evaluator_rf = MulticlassClassificationEvaluator(metricName="weightedPrecision")
rf_precision = precision_evaluator_rf.evaluate(rf_predictions)
print(f"Precision (Random Forest) on test data: {rf_precision}")

recall_evaluator_rf = MulticlassClassificationEvaluator(metricName="weightedRecall")
rf_recall = recall_evaluator_rf.evaluate(rf_predictions)
print(f"Recall (Random Forest) on test data: {rf_recall}")

# 16. Compute and print the confusion matrix for the Random Forest model
tp_rf = rf_predictions.filter((rf_predictions.label == 1) & (rf_predictions.prediction == 1)).count()
fp_rf = rf_predictions.filter((rf_predictions.label == 0) & (rf_predictions.prediction == 1)).count()
tn_rf = rf_predictions.filter((rf_predictions.label == 0) & (rf_predictions.prediction == 0)).count()
fn_rf = rf_predictions.filter((rf_predictions.label == 1) & (rf_predictions.prediction == 0)).count()

print(f"\nConfusion Matrix (Random Forest):")
print(f"True Positives (TP): {tp_rf}")
print(f"False Positives (FP): {fp_rf}")
print(f"True Negatives (TN): {tn_rf}")
print(f"False Negatives (FN): {fn_rf}")

DataFrame with new log_timestamp and updated features:
+------+------+---------+------------------+--------------------+----------------+--------------------------------------------------+-----+
|userId|rating|timestamp|log_timestamp     |normalized_timestamp|num_user_ratings|features                                          |label|
+------+------+---------+------------------+--------------------+----------------+--------------------------------------------------+-----+
|1     |4     |964982703|20.687620734790286|0.19284624425107147 |232             |[4.0,0.19284624425107147,232.0,20.687620734790286]|1    |
|1     |4     |964981247|20.687619225953814|0.19284419260665842 |232             |[4.0,0.19284419260665842,232.0,20.687619225953814]|1    |
|1     |4     |964982224|20.687620238408208|0.1928455692938779  |232             |[4.0,0.1928455692938779,232.0,20.687620238408208] |1    |
|1     |5     |964983815|20.687621887141884|0.19284781116631003 |232             |[5.0,0.1928478111663100

## Task 6 — Performance & Execution

**6.1 Check partitions**

Get number of partitions for ratings and movies.

**6.2 Repartition and coalesce**

	•	explain difference
	•	show effect on shuffles
	•	check number of partitions with rdd.getNumPartitions()

**6.3 Cache**

Cache ratings:

	•	compute count
	•	compute average rating per movie twice
	•	measure difference using Python time

**6.4 explain(True)**

run explain() on:

	•	a join
	•	a groupBy
	•	a window function

And identify shuffle boundaries.

In [None]:
print(f"Number of partitions for ratings DataFrame: {ratings.rdd.getNumPartitions()}")
print(f"Number of partitions for movies DataFrame: {movies.rdd.getNumPartitions()}")

Number of partitions for ratings DataFrame: 1
Number of partitions for movies DataFrame: 1


In [None]:
print("\n--- Repartition Demonstration ---")
# Repartition the ratings DataFrame to 4 partitions
ratings_repartitioned = ratings.repartition(4)
print(f"Number of partitions after repartition(4): {ratings_repartitioned.rdd.getNumPartitions()}")

# Explain the repartition operation to show shuffle
print("\nExplain plan for repartition(4):")
ratings_repartitioned.explain(True)


--- Repartition Demonstration ---
Number of partitions after repartition(4): 4

Explain plan for repartition(4):
== Parsed Logical Plan ==
Repartition 4, true
+- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, ln(cast(timestamp#20 as double)) AS log_timestamp#4178, num_user_ratings#3540L, features#3668]
   +- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, ln(cast(timestamp#20 as double)) AS log_timestamp#4136, num_user_ratings#3540L, features#3668]
      +- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, log_timestamp#2612, num_user_ratings#3540L, UDF(struct(rating_double_VectorAssembler_95680feacbab, cast(rating#432 as double), normalized_timestamp, normalized_timestamp#3404, num_user_ratings_double_VectorAssembler_95680feacbab, cast(num_user_ratings#3540L as double))) AS features#3668]
       

In [None]:
print("\n--- Coalesce Demonstration ---")
# Coalesce the ratings DataFrame to 2 partitions
# Note: coalesce can only decrease partitions, or keep the same number without a shuffle
ratings_coalesced = ratings.coalesce(2)
print(f"Number of partitions after coalesce(2): {ratings_coalesced.rdd.getNumPartitions()}")

# Explain the coalesce operation to show shuffle (or lack thereof)
print("\nExplain plan for coalesce(2):")
ratings_coalesced.explain(True)


--- Coalesce Demonstration ---
Number of partitions after coalesce(2): 1

Explain plan for coalesce(2):
== Parsed Logical Plan ==
Repartition 2, false
+- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, ln(cast(timestamp#20 as double)) AS log_timestamp#4178, num_user_ratings#3540L, features#3668]
   +- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, ln(cast(timestamp#20 as double)) AS log_timestamp#4136, num_user_ratings#3540L, features#3668]
      +- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, log_timestamp#2612, num_user_ratings#3540L, UDF(struct(rating_double_VectorAssembler_95680feacbab, cast(rating#432 as double), normalized_timestamp, normalized_timestamp#3404, num_user_ratings_double_VectorAssembler_95680feacbab, cast(num_user_ratings#3540L as double))) AS features#3668]
         +- Pro

In [None]:
print("\n--- Caching Demonstration ---")
# Cache the ratings DataFrame
ratings.cache()

# Compute count to trigger caching
print(f"Ratings DataFrame count (after caching): {ratings.count()}")


--- Caching Demonstration ---
Ratings DataFrame count (after caching): 100836


In [None]:
import time
from pyspark.sql.functions import avg

print("\n--- Timing Average Rating Calculation ---")

# First computation (should be slower if not previously cached, or benefit from cache)
start_time = time.time()
avg_ratings_1 = ratings.groupBy("movieId").agg(avg("rating").alias("average_rating")).collect()
end_time = time.time()
time_taken_1 = end_time - start_time
print(f"Time for first average rating calculation: {time_taken_1:.4f} seconds")

# Second computation (should be faster due to caching)
start_time = time.time()
avg_ratings_2 = ratings.groupBy("movieId").agg(avg("rating").alias("average_rating")).collect()
end_time = time.time()
time_taken_2 = end_time - start_time
print(f"Time for second average rating calculation (with cache): {time_taken_2:.4f} seconds")

# Unpersist the DataFrame from cache
ratings.unpersist()
print("Ratings DataFrame unpersisted from cache.")


--- Timing Average Rating Calculation ---
Time for first average rating calculation: 0.9145 seconds
Time for second average rating calculation (with cache): 0.3668 seconds
Ratings DataFrame unpersisted from cache.


In [None]:
print("\n--- Explain plan for a Join Operation ---")
joined_df = ratings.join(movies, on="movieId", how="inner")
joined_df.explain(True)


--- Explain plan for a Join Operation ---
== Parsed Logical Plan ==
'Join UsingJoin(Inner, [movieId])
:- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, ln(cast(timestamp#20 as double)) AS log_timestamp#4178, num_user_ratings#3540L, features#3668]
:  +- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, ln(cast(timestamp#20 as double)) AS log_timestamp#4136, num_user_ratings#3540L, features#3668]
:     +- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, log_timestamp#2612, num_user_ratings#3540L, UDF(struct(rating_double_VectorAssembler_95680feacbab, cast(rating#432 as double), normalized_timestamp, normalized_timestamp#3404, num_user_ratings_double_VectorAssembler_95680feacbab, cast(num_user_ratings#3540L as double))) AS features#3668]
:        +- Project [userId#17, movieId#18, rating#432, timestam

In [None]:
from pyspark.sql.functions import avg

print("\n--- Explain plan for a GroupBy Operation (Average Rating per Movie) ---")
avg_rating_per_movie_df = ratings.groupBy("movieId").agg(avg("rating").alias("average_rating"))
avg_rating_per_movie_df.explain(True)


--- Explain plan for a GroupBy Operation (Average Rating per Movie) ---
== Parsed Logical Plan ==
'Aggregate ['movieId], ['movieId, 'avg('rating) AS average_rating#5848]
+- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, ln(cast(timestamp#20 as double)) AS log_timestamp#4178, num_user_ratings#3540L, features#3668]
   +- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, ln(cast(timestamp#20 as double)) AS log_timestamp#4136, num_user_ratings#3540L, features#3668]
      +- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, log_timestamp#2612, num_user_ratings#3540L, UDF(struct(rating_double_VectorAssembler_95680feacbab, cast(rating#432 as double), normalized_timestamp, normalized_timestamp#3404, num_user_ratings_double_VectorAssembler_95680feacbab, cast(num_user_ratings#3540L as double))) AS features#36

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col, row_number

print("\n--- Explain plan for a Window Function (Rolling Average Rating per Movie) ---")
# Define a window partitioned by movieId and ordered by timestamp
window_spec = Window.partitionBy("movieId").orderBy("timestamp")

# Compute the rolling average rating per movie using the window function
rolling_avg_df = ratings.withColumn(
    "rolling_avg_rating",
    avg(col("rating")).over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))
)

rolling_avg_df.explain(True)


--- Explain plan for a Window Function (Rolling Average Rating per Movie) ---
== Parsed Logical Plan ==
'Project [unresolvedstarwithcolumns(rolling_avg_rating, 'avg('rating) windowspecdefinition('movieId, 'timestamp ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())), None)]
+- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, ln(cast(timestamp#20 as double)) AS log_timestamp#4178, num_user_ratings#3540L, features#3668]
   +- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, ln(cast(timestamp#20 as double)) AS log_timestamp#4136, num_user_ratings#3540L, features#3668]
      +- Project [userId#17, movieId#18, rating#432, timestamp#20, fake_null#755, label#2023, normalized_timestamp#3404, log_timestamp#2612, num_user_ratings#3540L, UDF(struct(rating_double_VectorAssembler_95680feacbab, cast(rating#432 as double), normalized_timest