<a href="https://colab.research.google.com/github/camille-310/depot-UA/blob/main/04_pyspark_movie_exercises.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark MovieLens


## 0. Setup (Run this cell)

In [3]:
!pip install -q pyspark

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("PySpark_Student_Exercises_Movies")
    .master("local[*]")
    .getOrCreate()
)

spark

## 1. Download & Load MovieLens Dataset

In [4]:
!wget -q https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
!unzip -q ml-latest-small.zip

ratings = spark.read.csv("ml-latest-small/ratings.csv", header=True, inferSchema=True)
movies  = spark.read.csv("ml-latest-small/movies.csv", header=True, inferSchema=True)

replace ml-latest-small/links.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace ml-latest-small/tags.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace ml-latest-small/ratings.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace ml-latest-small/README.txt? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace ml-latest-small/movies.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y


## Task 1 — Data Exploration

**1.1 Inspect the data**

*	Show first 10 rows of ratings
*	Print schema for both DataFrames
*	Count the number of rows in each




**1.2 Column selection**

Write PySpark code to:

* select only userId, movieId, rating
*	rename movieId → film_id
*	cast rating to integer or float explicitly

**1.3 Filtering**

Using ratings:

*	Find all ratings from userId = 1
*	Find all ratings greater than 4.5
*	Find all ratings on movieId in (1, 50, 100)


(use isin())

**1.4 Sorting & limiting**

*	Show the top 20 highest ratings
*	Show the 10 lowest ratings made by user 600
*	Sort by rating desc and timestamp asc

**1.5 Derived columns**

Create columns:

*	rating_x2 = rating * 2
*	positive_rating = 1 if rating ≥ 4 else 0
*	log_rating = log10(rating + 1)

**1.6 Missing values**

Even if ML-latest-small has no nulls, :

*	Add a fake null column
*	Demonstrate fill/replace/drop operations

**1.7 Distinct & deduplication**

*	Count unique users
*	Count unique movies rated
*	Drop duplicate ratings where (userId, movieId) might repeat (even though dataset is clean)

**1.1 Inspect the data**

*	Show first 10 rows of ratings
*	Print schema for both DataFrames
*	Count the number of rows in each

In [None]:
# Write your solution here
ratings.show(10)

print("schema ratings :", ratings.schema)
print("schema movies :", movies.schema)

total_rows_ratings = ratings.count()
print("Number of rows ratings:", total_rows_ratings)

total_rows_movies = movies.count()
print("Number of rows movies:", total_rows_movies)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
+------+-------+------+---------+
only showing top 10 rows

schema ratings : StructType([StructField('userId', IntegerType(), True), StructField('movieId', IntegerType(), True), StructField('rating', DoubleType(), True), StructField('timestamp', IntegerType(), True)])
schema movies : StructType([StructField('movieId', IntegerType(), True), StructField('title', StringType(), True), StructField('genres', StringType(), True)])
Number of rows ratings: 100836
Number of rows movies: 9742


**1.2 Column selection**

Write PySpark code to:

* select only userId, movieId, rating
*	rename movieId → film_id
*	cast rating to integer or float explicitly


In [None]:
from pyspark.sql.functions import col

ratings12 = ratings.select(col("userId"),
               col("movieId").alias("film_id"),
               col("rating").cast("float"))

ratings12.show(5)

+------+-------+------+
|userId|film_id|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



**1.3 Filtering**

Using ratings:

*	Find all ratings from userId = 1
*	Find all ratings greater than 4.5
*	Find all ratings on movieId in (1, 50, 100)

(use isin())

In [None]:
ratings.filter(col("userId")=="1").show(5)
ratings.filter(col("rating")>4.5).show(5)
ratings.filter(col("movieId").isin("1", "50", "100")).show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|    101|   5.0|964980868|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
+------+-------+------+---------+
only showing top 5 rows

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|     50|   5.0|964982931|
|     5|      1|   4.0|847434962|
|     5|     50|   4.0|847434881|
|     6|     50|   1.0|845553381|
+------+-------+------+---------+
only showing top 5 rows



****1.4 Sorting & limiting**

*	Show the top 20 highest ratings
*	Show the 10 lowest ratings made by user 600
*	Sort by rating desc and timestamp asc**

In [None]:
ratings.orderBy(col("rating").desc()).show(20)
ratings.filter(col("userId")=="600").orderBy(col("rating").asc()).show(10)
ratings.orderBy(col("rating").desc(), col("timestamp").asc()).show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|    954|   5.0|964983219|
|     1|   1220|   5.0|964981909|
|     1|   1023|   5.0|964982681|
|     1|     47|   5.0|964983815|
|     1|   1024|   5.0|964982876|
|     1|    101|   5.0|964980868|
|     1|   1025|   5.0|964982791|
|     1|    157|   5.0|964984100|
|     1|   1029|   5.0|964982855|
|     1|    216|   5.0|964981208|
|     1|   1031|   5.0|964982653|
|     1|    260|   5.0|964981680|
|     1|   1032|   5.0|964982791|
|     1|    457|   5.0|964981909|
|     1|   1049|   5.0|964982400|
|     1|    553|   5.0|964984153|
|     1|   1073|   5.0|964981680|
|     1|    608|   5.0|964982931|
|     1|   1080|   5.0|964981327|
|     1|    919|   5.0|964982475|
+------+-------+------+---------+
only showing top 20 rows

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|   600|    762|   0.5|1237707803|
|   600|   8665|  

**1.5 Derived columns**

Create columns:

*	rating_x2 = rating * 2
*	positive_rating = 1 if rating ≥ 4 else 0
*	log_rating = log10(rating + 1)

In [None]:
from pyspark.sql.functions import lit, expr, when

ratings15 = ratings.withColumn("rating_x2",col("rating")*2)\
      .withColumn("positive_rating",when(col("rating") >= 4, 1).otherwise(0))\
      .withColumn("log_rating",expr("log10(rating + 1)"))

ratings15.show(5)

+------+-------+------+---------+---------+---------------+------------------+
|userId|movieId|rating|timestamp|rating_x2|positive_rating|        log_rating|
+------+-------+------+---------+---------+---------------+------------------+
|     1|      1|   4.0|964982703|      8.0|              1|0.6989700043360189|
|     1|      3|   4.0|964981247|      8.0|              1|0.6989700043360189|
|     1|      6|   4.0|964982224|      8.0|              1|0.6989700043360189|
|     1|     47|   5.0|964983815|     10.0|              1|0.7781512503836436|
|     1|     50|   5.0|964982931|     10.0|              1|0.7781512503836436|
+------+-------+------+---------+---------+---------------+------------------+
only showing top 5 rows



**1.6 Missing values**

Even if ML-latest-small has no nulls, :

*	Add a fake null column
*	Demonstrate fill/replace/drop operations

In [None]:
from pyspark.sql.types import StringType

ratings16 = ratings.withColumn("fake",lit(None).cast(StringType()))
ratings16.show(5)
ratings16 = ratings16.na.fill({"fake": "Unknown"})
ratings16.show(5)
ratings16 = ratings16.replace("Unknown", "Pas renseigné", subset=["fake"])
ratings16.show(5)
ratings16 = ratings16.drop("fake")
ratings16.show(5)

+------+-------+------+---------+----+
|userId|movieId|rating|timestamp|fake|
+------+-------+------+---------+----+
|     1|      1|   4.0|964982703|NULL|
|     1|      3|   4.0|964981247|NULL|
|     1|      6|   4.0|964982224|NULL|
|     1|     47|   5.0|964983815|NULL|
|     1|     50|   5.0|964982931|NULL|
+------+-------+------+---------+----+
only showing top 5 rows

+------+-------+------+---------+-------+
|userId|movieId|rating|timestamp|   fake|
+------+-------+------+---------+-------+
|     1|      1|   4.0|964982703|Unknown|
|     1|      3|   4.0|964981247|Unknown|
|     1|      6|   4.0|964982224|Unknown|
|     1|     47|   5.0|964983815|Unknown|
|     1|     50|   5.0|964982931|Unknown|
+------+-------+------+---------+-------+
only showing top 5 rows

+------+-------+------+---------+-------------+
|userId|movieId|rating|timestamp|         fake|
+------+-------+------+---------+-------------+
|     1|      1|   4.0|964982703|Pas renseigné|
|     1|      3|   4.0|964981

**1.7 Distinct & deduplication**

*	Count unique users
*	Count unique movies rated
*	Drop duplicate ratings where (userId, movieId) might repeat (even though dataset is clean)

In [None]:
print("unique userID :", ratings.select("userId").distinct().count())
print("unique movies rated :", ratings.filter(ratings.movieId.isNotNull()).select("movieId").distinct().count())
ratings17 = ratings.dropDuplicates(["userId", "movieId"])
ratings17.show(10)

unique userID : 610
unique movies rated : 9724
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|   1208|   4.0| 964983250|
|     1|   1348|   4.0| 964983393|
|     4|    457|   5.0| 945079259|
|     4|    599|   2.0| 945078587|
|     6|    274|   4.0| 845555151|
|     6|    327|   1.0| 845554062|
|     6|    520|   3.0| 845553844|
|     7|   1101|   4.0|1106635502|
|     8|    235|   3.0| 839464076|
|     9|   5481|   5.0|1044656836|
+------+-------+------+----------+
only showing top 10 rows



## Task 2 — Aggregations & GroupBy

**2.1 Simple aggregations**

*	Compute min, max, avg rating
*	Count total number of ratings
*	Count ratings per movieId

**2.2 GroupBy**

Compute:

*	average rating per movie
*	number of ratings per movie
*	average rating per user
*	number of ratings per user

**2.3 Top items**

*	Top 20 most-rated movies
*	Top 20 best-rated movies (min 50 ratings)

→ You must filter with a join or window

**2.4 Window functions**

Using Window partitioned by movieId:

*	rank users by timestamp (earliest → latest rating)
*	create a lag column: previous rating by same user
*	compute average rating per movie with window


In [None]:
# Write your solution here

**2.1 Simple aggregations**

*	Compute min, max, avg rating
*	Count total number of ratings
*	Count ratings per movieId

In [None]:
from pyspark.sql.functions import count, round, sum, avg, min, max, countDistinct, approx_count_distinct

ratings.select(min("rating").alias("min_rating"),
              max("rating").alias("max_rating"),
              round(avg("rating"),3).alias("avg_rating"),
              count("rating").alias("total_ratings")).show()

ratings.groupBy("movieId").agg(count("rating").alias("total_ratings")).show(10)

+----------+----------+----------+-------------+
|min_rating|max_rating|avg_rating|total_ratings|
+----------+----------+----------+-------------+
|       0.5|       5.0|     3.502|       100836|
+----------+----------+----------+-------------+

+-------+-------------+
|movieId|total_ratings|
+-------+-------------+
|   1580|          165|
|   2366|           25|
|   3175|           75|
|   1088|           42|
|  32460|            4|
|  44022|           23|
|  96488|            4|
|   1238|            9|
|   1342|           11|
|   1591|           26|
+-------+-------------+
only showing top 10 rows



**2.2 GroupBy**

Compute:

*	average rating per movie
*	number of ratings per movie
*	average rating per user
*	number of ratings per user


In [None]:
ratings.groupBy("movieId").agg(
    avg("rating").alias("avg_rating_movie"),
    count("rating").alias("number_rating_movie")).show(5)

ratings.groupBy("userId").agg(
    avg("rating").alias("avg_rating_user"),
    count("rating").alias("number_rating_user")).show(5)

+-------+-----------------+-------------------+
|movieId| avg_rating_movie|number_rating_movie|
+-------+-----------------+-------------------+
|   1580|3.487878787878788|                165|
|   2366|             3.64|                 25|
|   3175|             3.58|                 75|
|   1088|3.369047619047619|                 42|
|  32460|             4.25|                  4|
+-------+-----------------+-------------------+
only showing top 5 rows

+------+------------------+------------------+
|userId|   avg_rating_user|number_rating_user|
+------+------------------+------------------+
|   148|3.7395833333333335|                48|
|   463| 3.787878787878788|                33|
|   471|             3.875|                28|
|   496| 3.413793103448276|                29|
|   243| 4.138888888888889|                36|
+------+------------------+------------------+
only showing top 5 rows



**2.3 Top items**

*	Top 20 most-rated movies
*	Top 20 best-rated movies (min 50 ratings)

→ You must filter with a join or window


In [None]:
nbr_ratings = ratings.groupBy("movieId").agg(
    avg("rating").alias("avg_rating_movie"),
    count("rating").alias("number_rating_movie"))

print("Top 20 most-rated movies")
nbr_ratings.orderBy(col("number_rating_movie").desc()).show(20)

print("Top 20 best-rated movies (min 50 ratings)")
nbr_ratings.filter(col("number_rating_movie") > 49).orderBy(col("avg_rating_movie").desc()).show(20)

Top 20 most-rated movies
+-------+------------------+-------------------+
|movieId|  avg_rating_movie|number_rating_movie|
+-------+------------------+-------------------+
|    356| 4.164133738601824|                329|
|    318| 4.429022082018927|                317|
|    296| 4.197068403908795|                307|
|    593| 4.161290322580645|                279|
|   2571| 4.192446043165468|                278|
|    260| 4.231075697211155|                251|
|    480|              3.75|                238|
|    110| 4.031645569620253|                237|
|    589| 3.970982142857143|                224|
|    527|             4.225|                220|
|   2959| 4.272935779816514|                218|
|      1|3.9209302325581397|                215|
|   1196|4.2156398104265405|                211|
|     50| 4.237745098039215|                204|
|   2858| 4.056372549019608|                204|
|     47|3.9753694581280787|                203|
|    780|3.4455445544554455|                

**2.4 Window functions**

Using Window partitioned by movieId:

*	rank users by timestamp (earliest → latest rating)
*	create a lag column: previous rating by same user
*	compute average rating per movie with window

In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import rank, lag

time_window = Window.partitionBy("movieId").orderBy(col("timestamp").desc())
time_rank = ratings.withColumn("time_rank",rank().over(time_window))
time_rank.show(10)

#si on veut partitionner par movieId comme demander, renvoie que des null car un user ne note qu'une seule fois le film
user_rating_time_window = Window.partitionBy("userId").orderBy("timestamp")
lag_previous_rating = ratings.withColumn("previous_rating",lag("rating").over(user_rating_time_window))
lag_previous_rating.show(10)

movie_window = Window.partitionBy("movieId")
avg_rating_movie = ratings.withColumn("avg_rating_movie",avg("rating").over(movie_window)).select("movieId", "avg_rating_movie").distinct()
avg_rating_movie.show(10)

+------+-------+------+----------+---------+
|userId|movieId|rating| timestamp|time_rank|
+------+-------+------+----------+---------+
|   596|      1|   4.0|1535709666|        1|
|   514|      1|   4.0|1533872400|        2|
|    98|      1|   4.5|1532457849|        3|
|   153|      1|   2.0|1525548642|        4|
|   567|      1|   3.5|1525286001|        5|
|   233|      1|   3.0|1524781249|        6|
|   601|      1|   4.0|1521467801|        7|
|    89|      1|   3.0|1520408314|        8|
|   382|      1|   4.5|1515162628|        9|
|    50|      1|   3.0|1514238116|       10|
+------+-------+------+----------+---------+
only showing top 10 rows

+------+-------+------+---------+---------------+
|userId|movieId|rating|timestamp|previous_rating|
+------+-------+------+---------+---------------+
|     1|    804|   4.0|964980499|           NULL|
|     1|   1210|   5.0|964980499|            4.0|
|     1|   2018|   5.0|964980523|            5.0|
|     1|   2628|   4.0|964980523|           

## Task 3 — Spark SQL

In [None]:
ratings.createOrReplaceTempView("ratings_table")
movies.createOrReplaceTempView("movies_table")

**3.1 Simple SQL Queries**

*	Show first 20 rows
*	Count total ratings
*	Count distinct users
*	Average rating overall

**3.2 SQL Grouping**
*	Average rating by movie
*	Ratings count by movie
*	Best movies with at least 100 ratings
*	Worst movies with at least 100 ratings
*	Most active users

**3.3 SQL CASE WHEN**

Create rating buckets:

*	≥ 4.0 → “high”
*	3.0–3.9 → “medium”
*	< 3.0 → “low”

**3.4 SQL Window Functions**

*	Top 10 movies by rating for each genre
*	Earliest rating per user (row_number)
*	Rolling average rating per movie

In [None]:
# Write your solution here

**3.1 Simple SQL Queries**

*	Show first 20 rows
*	Count total ratings
*	Count distinct users
*	Average rating overall

In [None]:
spark.sql("""
SELECT
  count(rating),
  count(distinct(userId)),
  avg(rating)
FROM ratings_table
LIMIT 20
""").show()

+-------------+----------------------+-----------------+
|count(rating)|count(DISTINCT userId)|      avg(rating)|
+-------------+----------------------+-----------------+
|       100836|                   610|3.501556983616962|
+-------------+----------------------+-----------------+



**3.2 SQL Grouping**

*	Average rating by movie
*	Ratings count by movie
*	Best movies with at least 100 ratings
*	Worst movies with at least 100 ratings
*	Most active users

In [None]:
print("top movies")
spark.sql("""
SELECT
  movieId,
  COUNT(rating) AS nb_rating,
  AVG(rating)  AS avg_rating
FROM ratings_table
GROUP BY movieId
HAVING COUNT(rating) > 100
ORDER BY avg_rating DESC
LIMIT 10
""").show()

print("worst movies")
spark.sql("""
SELECT
  movieId,
  COUNT(rating) AS nb_rating,
  AVG(rating)  AS avg_rating
FROM ratings_table
GROUP BY movieId
HAVING COUNT(rating) > 100
ORDER BY avg_rating ASC
LIMIT 10
""").show()

print("top active users")
spark.sql("""
SELECT
  userId,
  COUNT(rating) AS nb_rating,
  AVG(rating)  AS avg_rating
FROM ratings_table
GROUP BY userId
ORDER BY nb_rating DESC
LIMIT 10
""").show()

top movies
+-------+---------+-----------------+
|movieId|nb_rating|       avg_rating|
+-------+---------+-----------------+
|    318|      317|4.429022082018927|
|    858|      192|        4.2890625|
|   2959|      218|4.272935779816514|
|   1221|      129| 4.25968992248062|
|  48516|      107|4.252336448598131|
|   1213|      126|             4.25|
|  58559|      149|4.238255033557047|
|     50|      204|4.237745098039215|
|   1197|      142|4.232394366197183|
|    260|      251|4.231075697211155|
+-------+---------+-----------------+

worst movies
+-------+---------+------------------+
|movieId|nb_rating|        avg_rating|
+-------+---------+------------------+
|    208|      115|2.9130434782608696|
|    153|      137|2.9160583941605838|
|    586|      116|2.9956896551724137|
|    434|      101|3.0346534653465347|
|    185|      112|3.0401785714285716|
|    344|      161| 3.040372670807453|
|    231|      133|3.0601503759398496|
|   2628|      140| 3.107142857142857|
|    367|     

**3.3 SQL CASE WHEN**

Create rating buckets:

*	≥ 4.0 → “high”
*	3.0–3.9 → “medium”
*	< 3.0 → “low”

In [None]:
spark.sql("""
SELECT
  *,
  CASE
    WHEN rating > 4 THEN 'HIGH'
    WHEN rating BETWEEN 3 AND 3.99 THEN 'MEDIUM'
    ELSE 'LOW'
  END AS rating_bucket
FROM ratings_table
LIMIT 20
""").show()

+------+-------+------+---------+-------------+
|userId|movieId|rating|timestamp|rating_bucket|
+------+-------+------+---------+-------------+
|     1|      1|   4.0|964982703|          LOW|
|     1|      3|   4.0|964981247|          LOW|
|     1|      6|   4.0|964982224|          LOW|
|     1|     47|   5.0|964983815|         HIGH|
|     1|     50|   5.0|964982931|         HIGH|
|     1|     70|   3.0|964982400|       MEDIUM|
|     1|    101|   5.0|964980868|         HIGH|
|     1|    110|   4.0|964982176|          LOW|
|     1|    151|   5.0|964984041|         HIGH|
|     1|    157|   5.0|964984100|         HIGH|
|     1|    163|   5.0|964983650|         HIGH|
|     1|    216|   5.0|964981208|         HIGH|
|     1|    223|   3.0|964980985|       MEDIUM|
|     1|    231|   5.0|964981179|         HIGH|
|     1|    235|   4.0|964980908|          LOW|
|     1|    260|   5.0|964981680|         HIGH|
|     1|    296|   3.0|964982967|       MEDIUM|
|     1|    316|   3.0|964982310|       

**3.4 SQL Window Functions**

*	Top 10 movies by rating for each genre
*	Earliest rating per user (row_number)
*	Rolling average rating per movie

In [None]:
spark.sql("""
SELECT
  movieId,
  COUNT(rating) AS nb_rating,
  AVG(rating)  AS avg_rating
FROM ratings_table
GROUP BY movieId
HAVING COUNT(rating) > 100
ORDER BY avg_rating DESC
LIMIT 10
""").createOrReplaceTempView("movies_ranked")

spark.sql("""
SELECT
  Rank.movieId,
  Rank.nb_rating,
  Rank.avg_rating,
  Movies.title,
  Movies.genres,
  ROW_NUMBER() OVER (PARTITION BY Movies.genres ORDER BY Rank.avg_rating DESC) AS rn
FROM movies_ranked AS Rank
LEFT JOIN movies_table AS Movies
ON Movies.movieId = Rank.movieId
""").createOrReplaceTempView("ranked_by_genre")

spark.sql("""
SELECT *
FROM ranked_by_genre
WHERE rn = 1
ORDER BY genres, avg_rating DESC
""").show(10)

+-------+---------+-----------------+--------------------+--------------------+---+
|movieId|nb_rating|       avg_rating|               title|              genres| rn|
+-------+---------+-----------------+--------------------+--------------------+---+
|   1197|      142|4.232394366197183|Princess Bride, T...|Action|Adventure|...|  1|
|    260|      251|4.231075697211155|Star Wars: Episod...|Action|Adventure|...|  1|
|  58559|      149|4.238255033557047|Dark Knight, The ...|Action|Crime|Dram...|  1|
|   2959|      218|4.272935779816514|   Fight Club (1999)|Action|Crime|Dram...|  1|
|    318|      317|4.429022082018927|Shawshank Redempt...|         Crime|Drama|  1|
|  48516|      107|4.252336448598131|Departed, The (2006)|Crime|Drama|Thriller|  1|
|     50|      204|4.237745098039215|Usual Suspects, T...|Crime|Mystery|Thr...|  1|
+-------+---------+-----------------+--------------------+--------------------+---+



## Task 4 — Joins & Genre Analytics

**4.1 Basic join**

Join ratings → movies on movieId.

*	Show 20 joined rows
*	Show userId, title, rating
*	Count how many ratings each genre has

**4.2 Parse genres**

genres looks like "Action|Adventure|Sci-Fi"

*	split genres into an array
*	explode the genres
*	count ratings per genre
*	compute average rating per genre

**4.3 Join + Aggregation**

Compute:

*	average rating per genre
*	average rating per movie title
*	number of ratings per movie
*	number of ratings per genre per year (bonus: extract year from title)

**4.4 Left Anti Join**

Find:

*	movies in movies.csv with no ratings
*	number of such movies

**4.1 Basic join**

Join ratings → movies on movieId.

*	Show 20 joined rows
*	Show userId, title, rating
*	Count how many ratings each genre has

In [19]:
from pyspark.sql.functions import col, count, round, sum, avg, min, max, countDistinct, approx_count_distinct

In [9]:
joined = ratings.join(
    movies,
    ratings["movieId"] == movies["movieId"],
    how ="left")

joined.select("userId", "title", "rating").show(20)

joined.groupBy("genres").agg(count("rating").alias("count_ratings")).show(20)

+------+--------------------+------+
|userId|               title|rating|
+------+--------------------+------+
|     1|    Toy Story (1995)|   4.0|
|     1|Grumpier Old Men ...|   4.0|
|     1|         Heat (1995)|   4.0|
|     1|Seven (a.k.a. Se7...|   5.0|
|     1|Usual Suspects, T...|   5.0|
|     1|From Dusk Till Da...|   3.0|
|     1|Bottle Rocket (1996)|   5.0|
|     1|   Braveheart (1995)|   4.0|
|     1|      Rob Roy (1995)|   5.0|
|     1|Canadian Bacon (1...|   5.0|
|     1|    Desperado (1995)|   5.0|
|     1|Billy Madison (1995)|   5.0|
|     1|       Clerks (1994)|   3.0|
|     1|Dumb & Dumber (Du...|   5.0|
|     1|      Ed Wood (1994)|   4.0|
|     1|Star Wars: Episod...|   5.0|
|     1| Pulp Fiction (1994)|   3.0|
|     1|     Stargate (1994)|   3.0|
|     1|    Tommy Boy (1995)|   5.0|
|     1|Clear and Present...|   4.0|
+------+--------------------+------+
only showing top 20 rows
+--------------------+-------------+
|              genres|count_ratings|
+------------

**4.2 Parse genres**

genres looks like "Action|Adventure|Sci-Fi"

*	split genres into an array
*	explode the genres
*	count ratings per genre
*	compute average rating per genre

In [12]:
from pyspark.sql.functions import split, explode, round

movie_split = joined.withColumn("genres_split", split("genres", "\|")) \
        .withColumn("genre", explode("genres_split")) \
        .drop("genres_split")

genre_rats = movie_split.groupBy("genre").agg(
    count("rating").alias("count_ratings"),
    round(avg("rating"),2).alias("average_rating"))

genre_rats.show(20)

  movie_split = joined.withColumn("genres_split", split("genres", "\|")) \


+------------------+-------------+--------------+
|             genre|count_ratings|average_rating|
+------------------+-------------+--------------+
|             Crime|        16681|          3.66|
|           Romance|        18124|          3.51|
|          Thriller|        26452|          3.49|
|         Adventure|        24161|          3.51|
|             Drama|        41928|          3.66|
|               War|         4859|          3.81|
|       Documentary|         1219|           3.8|
|           Fantasy|        11834|          3.49|
|           Mystery|         7674|          3.63|
|           Musical|         4138|          3.56|
|         Animation|         6988|          3.63|
|         Film-Noir|          870|          3.92|
|(no genres listed)|           47|          3.49|
|              IMAX|         4145|          3.62|
|            Horror|         7291|          3.26|
|           Western|         1930|          3.58|
|            Comedy|        39053|          3.38|


**4.3 Join + Aggregation**

Compute:

*	average rating per genre
*	average rating per movie title
*	number of ratings per movie
*	number of ratings per genre per year (bonus: extract year from title)

In [21]:
from pyspark.sql.functions import regexp_extract

movie_split.groupBy("title").agg(
    round(avg("rating"),2).alias("average_rating"),
    count("rating").alias("count_ratings")).show(20)

year_split = movie_split.withColumn("year", regexp_extract("title", r"\((\d{4})\)", 1))

year_split.groupBy("year", "genre").agg(
    round(avg("rating"),2).alias("average_rating"),
    count("rating").alias("count_ratings")).orderBy(col("year").desc()).show(20)

+--------------------+--------------+-------------+
|               title|average_rating|count_ratings|
+--------------------+--------------+-------------+
|       Psycho (1960)|          4.04|          166|
|Men in Black (a.k...|          3.49|          495|
|Gulliver's Travel...|           3.0|            9|
|Heavenly Creature...|          3.93|           42|
|    Elizabeth (1998)|          3.67|           23|
|Before Night Fall...|           4.3|            5|
|O Brother, Where ...|          3.81|          282|
|Snow White and th...|          3.62|          385|
| Three Wishes (1995)|           3.0|            2|
|When We Were King...|           3.9|           10|
|   Annie Hall (1977)|          3.87|          116|
| If Lucy Fell (1996)|           2.5|            4|
|First Blood (Ramb...|          3.55|          120|
|Don't Tell Mom th...|          2.35|           13|
| Nut Job, The (2014)|          4.33|           12|
|22 Jump Street (2...|          3.68|           57|
|   Deadpool

**4.4 Left Anti Join**

Find:

*	movies in movies.csv with no ratings
*	number of such movies

In [25]:
no_ratings = movies.join(
    ratings,
    on="movieId",
    how="left_anti")

no_ratings.show(20)
print("nombre de films sans notes :",no_ratings.count())

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|   1076|Innocents, The (1...|Drama|Horror|Thri...|
|   2939|      Niagara (1953)|      Drama|Thriller|
|   3338|For All Mankind (...|         Documentary|
|   3456|Color of Paradise...|               Drama|
|   4194|I Know Where I'm ...|   Drama|Romance|War|
|   5721|  Chosen, The (1981)|               Drama|
|   6668|Road Home, The (W...|       Drama|Romance|
|   6849|      Scrooge (1970)|Drama|Fantasy|Mus...|
|   7020|        Proof (1991)|Comedy|Drama|Romance|
|   7792|Parallax View, Th...|            Thriller|
|   8765|This Gun for Hire...|Crime|Film-Noir|T...|
|  25855|Roaring Twenties,...|Crime|Drama|Thriller|
|  26085|Mutiny on the Bou...|Adventure|Drama|R...|
|  30892|In the Realms of ...|Animation|Documen...|
|  32160|Twentieth Century...|              Comedy|
|  32371|Call Northside 77...|Crime|Drama|Film-...|
|  34482|Bro

In [None]:
# Write your solution here

## Task 5 — MLlib Classification

**5.2 Feature engineering**

Create features:

	•	rating (as-is)
	•	timestamp
	•	normalized timestamp
	•	(optional) number of ratings by that user (using join or window)

Use VectorAssembler to pack them.

**5.3 Split data**

70% train, 30% test.

**5.4 Train model**

Train a Logistic Regression model.

	•	print coefficients
	•	print intercept
	•	print ROC AUC

**5.5 Evaluate**

Compute :

	•	accuracy
	•	precision
	•	recall
	•	confusion matrix (TP, FP, TN, FN)

**5.6 Task: Improve model**

try:

	•	Adding a log-transformed timestamp
	•	Using a DecisionTreeClassifier
	•	Using a RandomForestClassifier
	•	Comparing metrics

**Goal: Build a binary classifier:**

Predict whether a user will give rating ≥ 4.0

5.1 Create label

Add a column:

In [5]:
from pyspark.sql.functions import when, col

In [None]:
label = 1 if rating >= 4 else 0

In [6]:
labels = ratings.withColumn(
    "label",
    when(col("rating") >= 4.0, 1).otherwise(0))

labels.show(10)

+------+-------+------+---------+-----+
|userId|movieId|rating|timestamp|label|
+------+-------+------+---------+-----+
|     1|      1|   4.0|964982703|    1|
|     1|      3|   4.0|964981247|    1|
|     1|      6|   4.0|964982224|    1|
|     1|     47|   5.0|964983815|    1|
|     1|     50|   5.0|964982931|    1|
|     1|     70|   3.0|964982400|    0|
|     1|    101|   5.0|964980868|    1|
|     1|    110|   4.0|964982176|    1|
|     1|    151|   5.0|964984041|    1|
|     1|    157|   5.0|964984100|    1|
+------+-------+------+---------+-----+
only showing top 10 rows


**5.2 Feature engineering**

Create features:

	•	rating (as-is)
	•	timestamp
	•	normalized timestamp
	•	(optional) number of ratings by that user (using join or window)

Use VectorAssembler to pack them.

In [7]:
from pyspark.sql.window import Window
from pyspark.sql.functions import min, max, count
from pyspark.ml.feature import VectorAssembler

min_ts = labels.agg(min("timestamp")).first()[0]
max_ts = labels.agg(max("timestamp")).first()[0]

new_features = labels.withColumn(
    "timestamp_norm",
    (col("timestamp") - min_ts) / (max_ts - min_ts))

window_user = Window.partitionBy("userId")

new_features = new_features.withColumn("user_rating_count",count("*").over(window_user))

assembler = VectorAssembler(
    inputCols=["rating", "timestamp", "timestamp_norm", "user_rating_count"],
    outputCol="features")

df_features = assembler.transform(new_features)
df_features.show(10)

+------+-------+------+---------+-----+-------------------+-----------------+--------------------+
|userId|movieId|rating|timestamp|label|     timestamp_norm|user_rating_count|            features|
+------+-------+------+---------+-----+-------------------+-----------------+--------------------+
|     1|      1|   4.0|964982703|    1|0.19284624425107147|              232|[4.0,9.64982703E8...|
|     1|      3|   4.0|964981247|    1|0.19284419260665842|              232|[4.0,9.64981247E8...|
|     1|      6|   4.0|964982224|    1| 0.1928455692938779|              232|[4.0,9.64982224E8...|
|     1|     47|   5.0|964983815|    1|0.19284781116631003|              232|[5.0,9.64983815E8...|
|     1|     50|   5.0|964982931|    1|0.19284656552505924|              232|[5.0,9.64982931E8...|
|     1|     70|   3.0|964982400|    0| 0.1928458172948509|              232|[3.0,9.649824E8,0...|
|     1|    101|   5.0|964980868|    1|0.19284365855910857|              232|[5.0,9.64980868E8...|
|     1|  

**5.3 Split data**

70% train, 30% test.

In [8]:
train_df, test_df = df_features.randomSplit([0.7, 0.3])

# Vérification
print(f"taille training: {train_df.count()}")
print(f"taille test: {test_df.count()}")

taille training: 70727
taille test: 30109


**5.4 Train model**

Train a Logistic Regression model.

	•	print coefficients
	•	print intercept
	•	print ROC AUC

In [9]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# le modèle
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Entraîner le modèle
lr_model = lr.fit(train_df)

# print coefficients et intercept
print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)

# prédictions
predictions = lr_model.transform(test_df)

# ROC AUC
evaluator = BinaryClassificationEvaluator(
    labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

roc_auc = evaluator.evaluate(predictions)
print("ROC AUC:", roc_auc)

Coefficients: [72.43698840330255,-1.8962465847585054e-09,-1.3457181029110683,-3.35090714531708e-05]
Intercept: -268.39396510117757
ROC AUC: 0.9999997347089418


**5.5 Evaluate**

Compute :

	•	accuracy
	•	precision
	•	recall
	•	confusion matrix (TP, FP, TN, FN)

In [10]:
TP = predictions.filter((col("prediction") == 1) & (col("label") == 1)).count()

TN = predictions.filter((col("prediction") == 0) & (col("label") == 0)).count()

FP = predictions.filter((col("prediction") == 1) & (col("label") == 0)).count()

FN = predictions.filter((col("prediction") == 0) & (col("label") == 1)).count()

accuracy = (TP + TN) / (TP + TN + FP + FN)
precision = TP / (TP + FP) if (TP + FP) != 0 else 0
recall = TP / (TP + FN) if (TP + FN) != 0 else 0

print(f"Confusion matrix: TP={TP}, FP={FP}, TN={TN}, FN={FN}")
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")

Confusion matrix: TP=14368, FP=0, TN=15741, FN=0
Accuracy: 1.0
Precision: 1.0
Recall: 1.0


**5.6 Task: Improve model**

try:

	•	Adding a log-transformed timestamp
	•	Using a DecisionTreeClassifier
	•	Using a RandomForestClassifier
	•	Comparing metrics

In [11]:
from pyspark.sql.functions import log1p
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

new_df_features = df_features.drop("features")
new_df_features = new_df_features.withColumn("log_timestamp", log1p("timestamp"))

assembler = VectorAssembler(
    inputCols=["rating", "log_timestamp", "user_rating_count"],
    outputCol="features")

new_df_features = assembler.transform(new_df_features)

train_df, test_df = new_df_features.randomSplit([0.7, 0.3])

# DecisionTree
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label", seed=42)
dt_model = dt.fit(train_df)
dt_preds = dt_model.transform(test_df)

# RandomForest
rf = RandomForestClassifier(featuresCol="features", labelCol="label", seed=42)
rf_model = rf.fit(train_df)
rf_preds = rf_model.transform(test_df)

# Evaluate
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
roc_auc_dt = evaluator.evaluate(dt_preds)
print("DecisionTree ROC AUC:", roc_auc_dt)

evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
roc_auc_rf = evaluator.evaluate(rf_preds)
print("RandomForest ROC AUC:", roc_auc_rf)

# Metrics
TP_dt = dt_preds.filter((col("prediction") == 1) & (col("label") == 1)).count()
TN_dt = dt_preds.filter((col("prediction") == 0) & (col("label") == 0)).count()
FP_dt = dt_preds.filter((col("prediction") == 1) & (col("label") == 0)).count()
FN_dt = dt_preds.filter((col("prediction") == 0) & (col("label") == 1)).count()

accuracy_dt = (TP_dt + TN_dt) / (TP_dt + TN_dt + FP_dt + FN_dt)
precision_dt = TP_dt / (TP_dt + FP_dt) if (TP_dt + FP_dt) != 0 else 0
recall_dt = TP_dt / (TP_dt + FN_dt) if (TP_dt + FN_dt) != 0 else 0

TP_rf = rf_preds.filter((col("prediction") == 1) & (col("label") == 1)).count()
TN_rf = rf_preds.filter((col("prediction") == 0) & (col("label") == 0)).count()
FP_rf = rf_preds.filter((col("prediction") == 1) & (col("label") == 0)).count()
FN_rf = rf_preds.filter((col("prediction") == 0) & (col("label") == 1)).count()

accuracy_rf = (TP_rf + TN_rf) / (TP_rf + TN_rf + FP_rf + FN_rf)
precision_rf = TP_rf / (TP_rf + FP_rf) if (TP_rf + FP_rf) != 0 else 0
recall_rf = TP_rf / (TP_rf + FN_rf) if (TP_rf + FN_rf) != 0 else 0

print(f"Confusion matrix DT: TP={TP_dt}, FP={FP_dt}, TN={TN_dt}, FN={FN_dt}")
print(f"Confusion matrix RF: TP={TP_rf}, FP={FP_rf}, TN={TN_rf}, FN={FN_rf}")
print(f"Accuracy DT: {accuracy_dt}")
print(f"Accuracy RF: {accuracy_rf}")
print(f"Precision DT: {precision_dt}")
print(f"Precision RF: {precision_rf}")
print(f"Recall DT: {recall_dt}")
print(f"Recall RF: {recall_rf}")

DecisionTree ROC AUC: 1.0
RandomForest ROC AUC: 1.0
Confusion matrix DT: TP=14605, FP=0, TN=15691, FN=0
Confusion matrix RF: TP=14605, FP=0, TN=15691, FN=0
Accuracy DT: 1.0
Accuracy RF: 1.0
Precision DT: 1.0
Precision RF: 1.0
Recall DT: 1.0
Recall RF: 1.0


Les 2 méthodes prédisent parfaitement notre jeu de donnée

In [None]:
# Write your solution here

## Task 6 — Performance & Execution

**6.1 Check partitions**

Get number of partitions for ratings and movies.

**6.2 Repartition and coalesce**

	•	explain difference
	•	show effect on shuffles
	•	check number of partitions with rdd.getNumPartitions()

**6.3 Cache**

Cache ratings:

	•	compute count
	•	compute average rating per movie twice
	•	measure difference using Python time

**6.4 explain(True)**

run explain() on:

	•	a join
	•	a groupBy
	•	a window function

And identify shuffle boundaries.

In [None]:
# Write your solution here

## Stop Spark

In [None]:
spark.stop()