In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, from_unixtime, year, month, dayofmonth, split, explode, trim, regexp_extract,
    when, lower, count as f_count, avg as f_avg, dense_rank, lit
)
from pyspark.sql.window import Window
import os, shutil

In [None]:
USE_GOOGLE_DRIVE = True  # <- set to False if you don't want to mount Drive

if USE_GOOGLE_DRIVE:
    from google.colab import drive
    drive.mount('/content/drive')

# Project paths (Drive or local)
base = "/content/drive/MyDrive/moviedw" if USE_GOOGLE_DRIVE else "/content/moviedw"
RAW_PATH = f"{base}/data/raw/ml-1m"
CURATED_PATH = f"{base}/data/curated"

# Clean old outputs if re-running
if os.path.exists(CURATED_PATH):
    shutil.rmtree(CURATED_PATH)
os.makedirs(RAW_PATH, exist_ok=True)
os.makedirs(CURATED_PATH, exist_ok=True)

RAW_PATH, CURATED_PATH


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


('/content/drive/MyDrive/moviedw/data/raw/ml-1m',
 '/content/drive/MyDrive/moviedw/data/curated')

In [None]:
# Download MovieLens 1M (approx 5.8MB)
!wget -q -O /content/ml-1m.zip http://files.grouplens.org/datasets/movielens/ml-1m.zip

# Extract into RAW_PATH
!unzip -q /content/ml-1m.zip -d /content

# Move files into RAW_PATH
!cp -r /content/ml-1m/* "{RAW_PATH}"
!ls -al "{RAW_PATH}"


replace /content/ml-1m/movies.dat? [y]es, [n]o, [A]ll, [N]one, [r]ename: total 24323
-rw------- 1 root root   171308 Aug 14 03:55 movies.dat
-rw------- 1 root root 24594131 Aug 14 03:55 ratings.dat
-rw------- 1 root root     5577 Aug 14 03:55 README
-rw------- 1 root root   134368 Aug 14 03:55 users.dat


In [None]:
spark = SparkSession.builder \
    .appName("MovieDW-Colab") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

spark.version


'3.5.1'

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType

# --- Read ratings.dat ---
# Format: UserID::MovieID::Rating::Timestamp
ratings_raw = spark.read.text(f"{RAW_PATH}/ratings.dat")
ratings = ratings_raw.select(
    split(col("value"), "::").getItem(0).cast("int").alias("user_id"),
    split(col("value"), "::").getItem(1).cast("int").alias("movie_id"),
    split(col("value"), "::").getItem(2).cast("float").alias("rating"),
    split(col("value"), "::").getItem(3).cast("long").alias("ts")
)

# --- Read users.dat ---
# Format: UserID::Gender::Age::Occupation::Zip-code
users_raw = spark.read.text(f"{RAW_PATH}/users.dat")
users = users_raw.select(
    split(col("value"), "::").getItem(0).cast("int").alias("user_id"),
    split(col("value"), "::").getItem(1).alias("gender"),
    split(col("value"), "::").getItem(2).cast("int").alias("age_code"),
    split(col("value"), "::").getItem(3).cast("int").alias("occupation_code"),
    split(col("value"), "::").getItem(4).alias("zip_code")
)

# --- Read movies.dat ---
# Format: MovieID::Title::Genres (pipe-separated)
movies_raw = spark.read.text(f"{RAW_PATH}/movies.dat")
movies = movies_raw.select(
    split(col("value"), "::").getItem(0).cast("int").alias("movie_id"),
    split(col("value"), "::").getItem(1).alias("title"),
    split(col("value"), "::").getItem(2).alias("genres")
)

ratings.show(5, truncate=False)
users.show(5, truncate=False)
movies.show(5, truncate=False)


+-------+--------+------+---------+
|user_id|movie_id|rating|ts       |
+-------+--------+------+---------+
|1      |1193    |5.0   |978300760|
|1      |661     |3.0   |978302109|
|1      |914     |3.0   |978301968|
|1      |3408    |4.0   |978300275|
|1      |2355    |5.0   |978824291|
+-------+--------+------+---------+
only showing top 5 rows

+-------+------+--------+---------------+--------+
|user_id|gender|age_code|occupation_code|zip_code|
+-------+------+--------+---------------+--------+
|1      |F     |1       |10             |48067   |
|2      |M     |56      |16             |70072   |
|3      |M     |25      |15             |55117   |
|4      |M     |45      |7              |02460   |
|5      |M     |25      |20             |55455   |
+-------+------+--------+---------------+--------+
only showing top 5 rows

+--------+----------------------------------+----------------------------+
|movie_id|title                             |genres                      |
+--------+-------

In [None]:
# Ratings: timestamp -> rating_ts, year/month/day
ratings_ts = ratings.withColumn("rating_ts", from_unixtime(col("ts")).cast("timestamp")) \
    .withColumn("year", year(col("rating_ts"))) \
    .withColumn("month", month(col("rating_ts"))) \
    .withColumn("day", dayofmonth(col("rating_ts")))

# Movies: extract release_year from title e.g., "Toy Story (1995)"
release_year = regexp_extract(col("title"), r"\((\d{4})\)\s*$", 1).cast("int")
movies_clean = movies.withColumn("release_year", release_year)

# Explode genres
movies_genres = movies_clean.withColumn("genre", explode(split(col("genres"), r"\|"))) \
                            .withColumn("genre", trim(col("genre")))

movies_clean.select("movie_id","title","release_year").show(5, truncate=False)
movies_genres.select("movie_id","genre").show(5, truncate=False)


+--------+----------------------------------+------------+
|movie_id|title                             |release_year|
+--------+----------------------------------+------------+
|1       |Toy Story (1995)                  |1995        |
|2       |Jumanji (1995)                    |1995        |
|3       |Grumpier Old Men (1995)           |1995        |
|4       |Waiting to Exhale (1995)          |1995        |
|5       |Father of the Bride Part II (1995)|1995        |
+--------+----------------------------------+------------+
only showing top 5 rows

+--------+----------+
|movie_id|genre     |
+--------+----------+
|1       |Animation |
|1       |Children's|
|1       |Comedy    |
|2       |Adventure |
|2       |Children's|
+--------+----------+
only showing top 5 rows



In [None]:
dim_movie_genres.show(10)

+--------+--------+
|movie_id|genre_id|
+--------+--------+
|       2|       9|
|       6|       1|
|       6|       6|
|       7|       5|
|       7|      14|
|      10|       1|
|      13|       3|
|      13|       4|
|      15|       1|
|      15|       2|
+--------+--------+
only showing top 10 rows



In [None]:
# --- dim_movies ---
dim_movies = movies_clean.select("movie_id","title","release_year").dropDuplicates(["movie_id"])

# --- dim_users ---
# Age codes in ML-1M: {1, 18, 25, 35, 45, 50, 56}
dim_users = users.select("user_id","gender","age_code","occupation_code","zip_code").dropDuplicates(["user_id"])

dim_users = dim_users.withColumn(
    "age_bucket",
    when(col("age_code") == 1,  "Under 18")
    .when(col("age_code") == 18, "18-24")
    .when(col("age_code") == 25, "25-34")
    .when(col("age_code") == 35, "35-44")
    .when(col("age_code") == 45, "45-49")
    .when(col("age_code") == 50, "50-55")
    .when(col("age_code") == 56, "56+")
    .otherwise(lit("Unknown"))
)

# --- dim_genres ---
dim_genres_base = movies_genres.select("genre").where(col("genre").isNotNull()).dropDuplicates()
w = Window.orderBy(col("genre").asc())
dim_genres = dim_genres_base.withColumn("genre_id", dense_rank().over(w)) \
                            .select("genre_id","genre")

# --- dim_movie_genres ---
dim_movie_genres = movies_genres.select("movie_id","genre").dropDuplicates() \
    .join(dim_genres, on="genre", how="left") \
    .select("movie_id","genre_id")


In [None]:
fact_ratings = ratings_ts.select(
    "user_id","movie_id","rating","rating_ts","year","month","day"
)
fact_ratings.printSchema()
fact_ratings.show(5, truncate=False)


root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- rating_ts: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)

+-------+--------+------+-------------------+----+-----+---+
|user_id|movie_id|rating|rating_ts          |year|month|day|
+-------+--------+------+-------------------+----+-----+---+
|1      |1193    |5.0   |2000-12-31 22:12:40|2000|12   |31 |
|1      |661     |3.0   |2000-12-31 22:35:09|2000|12   |31 |
|1      |914     |3.0   |2000-12-31 22:32:48|2000|12   |31 |
|1      |3408    |4.0   |2000-12-31 22:04:35|2000|12   |31 |
|1      |2355    |5.0   |2001-01-06 23:38:11|2001|1    |6  |
+-------+--------+------+-------------------+----+-----+---+
only showing top 5 rows



In [None]:
DIM_MOVIES_PATH = f"{CURATED_PATH}/dim_movies"
DIM_USERS_PATH = f"{CURATED_PATH}/dim_users"
DIM_GENRES_PATH = f"{CURATED_PATH}/dim_genres"
DIM_MOVIE_GENRES_PATH = f"{CURATED_PATH}/dim_movie_genres"
FACT_RATINGS_PATH = f"{CURATED_PATH}/fact_ratings"

dim_movies.write.mode("overwrite").parquet(DIM_MOVIES_PATH)
dim_users.write.mode("overwrite").parquet(DIM_USERS_PATH)
dim_genres.write.mode("overwrite").parquet(DIM_GENRES_PATH)
dim_movie_genres.write.mode("overwrite").parquet(DIM_MOVIE_GENRES_PATH)

# Partition fact by year/month for faster time filtering
fact_ratings.write.mode("overwrite").partitionBy("year","month").parquet(FACT_RATINGS_PATH)

!find "{CURATED_PATH}" -maxdepth 2 -type d -print


/content/drive/MyDrive/moviedw/data/curated
/content/drive/MyDrive/moviedw/data/curated/dim_movies
/content/drive/MyDrive/moviedw/data/curated/dim_users
/content/drive/MyDrive/moviedw/data/curated/dim_genres
/content/drive/MyDrive/moviedw/data/curated/dim_movie_genres
/content/drive/MyDrive/moviedw/data/curated/fact_ratings
/content/drive/MyDrive/moviedw/data/curated/fact_ratings/year=2000
/content/drive/MyDrive/moviedw/data/curated/fact_ratings/year=2001
/content/drive/MyDrive/moviedw/data/curated/fact_ratings/year=2002
/content/drive/MyDrive/moviedw/data/curated/fact_ratings/year=2003


In [None]:
dim_movies_v = spark.read.parquet(DIM_MOVIES_PATH)
dim_users_v = spark.read.parquet(DIM_USERS_PATH)
dim_genres_v = spark.read.parquet(DIM_GENRES_PATH)
dim_movie_genres_v = spark.read.parquet(DIM_MOVIE_GENRES_PATH)
fact_ratings_v = spark.read.parquet(FACT_RATINGS_PATH)

dim_movies_v.createOrReplaceTempView("dim_movies")
dim_users_v.createOrReplaceTempView("dim_users")
dim_genres_v.createOrReplaceTempView("dim_genres")
dim_movie_genres_v.createOrReplaceTempView("dim_movie_genres")
fact_ratings_v.createOrReplaceTempView("fact_ratings")

# Quick row counts
for t in ["dim_movies","dim_users","dim_genres","dim_movie_genres","fact_ratings"]:
    print(t, spark.sql(f"SELECT COUNT(*) cnt FROM {t}").collect()[0]["cnt"])


dim_movies 3883
dim_users 6040
dim_genres 18
dim_movie_genres 6408
fact_ratings 1000209


In [None]:
spark.sql("""
WITH agg AS (
  SELECT movie_id, AVG(rating) AS avg_rating, COUNT(*) AS cnt
  FROM fact_ratings
  GROUP BY movie_id
)
SELECT m.title, a.avg_rating, a.cnt
FROM agg a
JOIN dim_movies m ON a.movie_id = m.movie_id
WHERE a.cnt >= 100
ORDER BY a.avg_rating DESC, a.cnt DESC
LIMIT 10
""").show(truncate=False)


+-------------------------------------------------------------------+-----------------+----+
|title                                                              |avg_rating       |cnt |
+-------------------------------------------------------------------+-----------------+----+
|Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)|4.560509554140127|628 |
|Shawshank Redemption, The (1994)                                   |4.554557700942973|2227|
|Godfather, The (1972)                                              |4.524966261808367|2223|
|Close Shave, A (1995)                                              |4.52054794520548 |657 |
|Usual Suspects, The (1995)                                         |4.517106001121705|1783|
|Schindler's List (1993)                                            |4.510416666666667|2304|
|Wrong Trousers, The (1993)                                         |4.507936507936508|882 |
|Sunset Blvd. (a.k.a. Sunset Boulevard) (1950)                      |4

In [None]:
spark.sql("""
SELECT g.genre,
       fr.year,
       COUNT(*) AS ratings_cnt
FROM fact_ratings fr
JOIN dim_movie_genres mg ON fr.movie_id = mg.movie_id
JOIN dim_genres g ON mg.genre_id = g.genre_id
GROUP BY g.genre, fr.year
ORDER BY fr.year, g.genre
""").show(50, truncate=False)


+-----------+----+-----------+
|genre      |year|ratings_cnt|
+-----------+----+-----------+
|Action     |2000|238212     |
|Adventure  |2000|124112     |
|Animation  |2000|38951      |
|Children's |2000|65130      |
|Comedy     |2000|321496     |
|Crime      |2000|72632      |
|Documentary|2000|6941       |
|Drama      |2000|318840     |
|Fantasy    |2000|33564      |
|Film-Noir  |2000|16898      |
|Horror     |2000|69916      |
|Musical    |2000|37458      |
|Mystery    |2000|36681      |
|Romance    |2000|133796     |
|Sci-Fi     |2000|146085     |
|Thriller   |2000|172903     |
|War        |2000|63456      |
|Western    |2000|19043      |
|Action     |2001|14021      |
|Adventure  |2001|7055       |
|Animation  |2001|3301       |
|Children's |2001|5228       |
|Comedy     |2001|25136      |
|Crime      |2001|5010       |
|Documentary|2001|663        |
|Drama      |2001|25226      |
|Fantasy    |2001|1885       |
|Film-Noir  |2001|963        |
|Horror     |2001|4526       |
|Musical

In [None]:
spark.sql("""
SELECT u.user_id, u.gender, u.age_bucket, COUNT(*) AS ratings_cnt
FROM fact_ratings fr
JOIN dim_users u ON fr.user_id = u.user_id
GROUP BY u.user_id, u.gender, u.age_bucket
ORDER BY ratings_cnt DESC
LIMIT 10
""").show(truncate=False)


+-------+------+----------+-----------+
|user_id|gender|age_bucket|ratings_cnt|
+-------+------+----------+-----------+
|4169   |M     |50-55     |2314       |
|1680   |M     |25-34     |1850       |
|4277   |M     |35-44     |1743       |
|1941   |M     |35-44     |1595       |
|1181   |M     |35-44     |1521       |
|889    |M     |45-49     |1518       |
|3618   |M     |56+       |1344       |
|2063   |M     |25-34     |1323       |
|1150   |F     |25-34     |1302       |
|1015   |M     |35-44     |1286       |
+-------+------+----------+-----------+



In [None]:
spark.sql("""
WITH agg AS (
  SELECT movie_id, COUNT(*) AS cnt, AVG(rating) AS avg_rating
  FROM fact_ratings
  GROUP BY movie_id
),
filtered AS (
  SELECT movie_id, cnt, avg_rating
  FROM agg
  WHERE cnt >= 300
)
SELECT m.title, f.cnt, ROUND(f.avg_rating, 3) AS avg_rating
FROM filtered f
JOIN dim_movies m ON f.movie_id = m.movie_id
ORDER BY f.cnt DESC, f.avg_rating DESC
LIMIT 15
""").show(truncate=False)


+-----------------------------------------------------+----+----------+
|title                                                |cnt |avg_rating|
+-----------------------------------------------------+----+----------+
|American Beauty (1999)                               |3428|4.317     |
|Star Wars: Episode IV - A New Hope (1977)            |2991|4.454     |
|Star Wars: Episode V - The Empire Strikes Back (1980)|2990|4.293     |
|Star Wars: Episode VI - Return of the Jedi (1983)    |2883|4.023     |
|Jurassic Park (1993)                                 |2672|3.764     |
|Saving Private Ryan (1998)                           |2653|4.337     |
|Terminator 2: Judgment Day (1991)                    |2649|4.059     |
|Matrix, The (1999)                                   |2590|4.316     |
|Back to the Future (1985)                            |2583|3.99      |
|Silence of the Lambs, The (1991)                     |2578|4.352     |
|Men in Black (1997)                                  |2538|3.74

In [None]:
# Nulls in critical keys
spark.sql("""
SELECT
  SUM(CASE WHEN user_id  IS NULL THEN 1 ELSE 0 END) AS null_users,
  SUM(CASE WHEN movie_id IS NULL THEN 1 ELSE 0 END) AS null_movies,
  SUM(CASE WHEN rating   IS NULL THEN 1 ELSE 0 END) AS null_ratings
FROM fact_ratings
""").show()

# Ratings range sanity
spark.sql("""
SELECT MIN(rating) AS min_rating, MAX(rating) AS max_rating
FROM fact_ratings
""").show()

# Orphans: movie IDs in fact not in dim_movies (should be 0)
spark.sql("""
SELECT COUNT(*) AS orphans
FROM fact_ratings fr
LEFT JOIN dim_movies dm ON fr.movie_id = dm.movie_id
WHERE dm.movie_id IS NULL
""").show()


+----------+-----------+------------+
|null_users|null_movies|null_ratings|
+----------+-----------+------------+
|         0|          0|           0|
+----------+-----------+------------+

+----------+----------+
|min_rating|max_rating|
+----------+----------+
|       1.0|       5.0|
+----------+----------+

+-------+
|orphans|
+-------+
|      0|
+-------+



In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Prepare data (ALS expects columns: userCol, itemCol, ratingCol)
als_data = fact_ratings_v.select("user_id","movie_id","rating")

# Train/validation split
train, test = als_data.randomSplit([0.8, 0.2], seed=42)

als = ALS(
    userCol="user_id",
    itemCol="movie_id",
    ratingCol="rating",
    rank=20,
    maxIter=10,
    regParam=0.1,
    nonnegative=True,
    coldStartStrategy="drop"  # drop NaN predictions
)

model = als.fit(train)
pred = model.transform(test)

rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction").evaluate(pred)
print("ALS RMSE:", rmse)

# Top-N recommendations for a few users
user_recs = model.recommendForAllUsers(5)
user_recs.show(5, truncate=False)


ALS RMSE: 0.8694922546907288
+-------+--------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                             |
+-------+--------------------------------------------------------------------------------------------+
|6      |[{572, 5.6633964}, {1035, 4.6103888}, {3233, 4.594942}, {1741, 4.535442}, {3853, 4.4642057}]|
|9      |[{572, 4.897166}, {318, 4.4288607}, {1851, 4.334844}, {50, 4.319377}, {2905, 4.284457}]     |
|12     |[{2309, 4.7400374}, {572, 4.70445}, {296, 4.556843}, {858, 4.5508604}, {557, 4.462086}]     |
|13     |[{572, 4.8418674}, {260, 4.0196233}, {3233, 4.011985}, {318, 3.9945614}, {1198, 3.9881265}] |
|14     |[{572, 4.4262624}, {53, 4.102438}, {527, 3.9698455}, {2324, 3.9610052}, {2964, 3.9488091}]  |
+-------+--------------------------------------------------------------------------------------------+
only showing top 5 rows

