## 📊 Movie Ratings Analytics with Medallion Architecture

This project demonstrates the Medallion Architecture (Bronze → Silver → Gold) using the MovieLens Small Dataset
.
It showcases how raw data is ingested, cleaned, and transformed into analytics-ready tables for insights into movie ratings and genre popularity.

## 🏗️ Architecture Overview

**Bronze Layer 🪙**

* Raw ingestion of CSV files (ratings.csv, movies.csv).
* No transformations, schema-on-read.

**Silver Layer 🥈**

* Clean and curated data.
* Data type casting, timestamp conversion, splitting genres.
* Joined ratings + movies into a unified dataset.

**Gold Layer 🥇**

* Aggregated insights for business consumption: 
    * Top 10 movies by average rating (with ≥ 50 ratings).
    * Most popular genres by rating count.
    * Average rating per genre.

## 📂 Dataset

We use the [MovieLens Latest Small Dataset](https://grouplens.org/datasets/movielens/?utm_source=chatgpt.com):

Files used in this project:
* movies.csv → movieId, title, genres
* ratings.csv → userId, movieId, rating, timestamp

**Get Secrets**

In [0]:
app_secret = dbutils.secrets.get(scope = "medallion-project-scope", key = "app-secret")
app_id = dbutils.secrets.get(scope = "medallion-project-scope", key = "app-id")
tenant_id = dbutils.secrets.get(scope = "medallion-project-scope", key = "tenant-id")

In [0]:
spark.conf.set("fs.azure.account.auth.type.datalakesonakshi.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.datalakesonakshi.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.datalakesonakshi.dfs.core.windows.net", app_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.datalakesonakshi.dfs.core.windows.net", app_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.datalakesonakshi.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
# log the notebook start time 
import time
notebook_start_time = time.time()

# Bronze Layer

**Read Data**

In [0]:
bronze_movies = spark.read.format('csv') \
        .option('header',True) \
        .option('inferSchema', True) \
        .load('abfss://source@datalakesonakshi.dfs.core.windows.net/movies.csv')

In [0]:
bronze_ratings = spark.read.format('csv') \
    .option('header',True) \
    .option('inferSchema',True) \
    .load('abfss://source@datalakesonakshi.dfs.core.windows.net/ratings.csv')

**Display Data**

In [0]:
display(bronze_movies)

In [0]:
display(bronze_ratings)

**Write/Save the data as parquet files**

In [0]:
bronze_movies.write.format("delta") \
    .mode("overwrite") \
    .option("path", "abfss://destination@datalakesonakshi.dfs.core.windows.net/movies") \
    .save()

In [0]:
bronze_ratings.write.format("delta") \
    .mode("overwrite") \
    .option("path", "abfss://destination@datalakesonakshi.dfs.core.windows.net/ratings") \
    .save()

# Silver Layer

In [0]:
from pyspark.sql.functions import col, from_unixtime, split

In [0]:
# Read bronze data

bronze_movies = spark.read.format("delta").load("abfss://destination@datalakesonakshi.dfs.core.windows.net/movies")
bronze_ratings = spark.read.format("delta").load("abfss://destination@datalakesonakshi.dfs.core.windows.net/ratings")

**Transform Data**

In [0]:
silver_ratings = bronze_ratings.withColumn("rating", col("rating").cast("float")) \
    .withColumn("timestamp", col("timestamp").cast("long")) \
    .withColumn("rating_date", from_unixtime(col("timestamp")))

display(silver_ratings)


In [0]:
silver_movies = bronze_movies.withColumn("genres", split(col("genres"), "\\|"))

display(silver_movies)

In [0]:
# join the two tables
silver_ratings = silver_ratings.join(silver_movies, on="movieId", how="inner") \
    .select("userId","movieId","rating","rating_date","title","genres")
display(silver_ratings)

In [0]:
# write the new joined table
silver_ratings.write.format("delta") \
    .mode("overwrite") \
    .save("abfss://destination@datalakesonakshi.dfs.core.windows.net/ratings_movies")

# Gold Layer

In [0]:
from pyspark.sql.functions import count, avg, explode

In [0]:
silver_layer_data = spark.read.format("delta").load("abfss://destination@datalakesonakshi.dfs.core.windows.net/ratings_movies")
display(silver_layer_data)

In [0]:
top_movies = silver_layer_data.groupBy("movieId", "title") \
    .agg(count("rating").alias("total_ratings"), avg("rating").alias("avg_rating")) \
    .filter(col("total_ratings")>=50) \
    .orderBy(col("avg_rating").desc())

top_movies.show(10, truncate=False)

In [0]:
popular_genres = silver_layer_data.groupBy("genres") \
    .agg(count("rating").alias("total_ratings"), avg("rating").alias("avg_rating")) \
    .orderBy(col("total_ratings").desc())


popular_genres.show(10,truncate=False)

In [0]:
# write the gold layer data
top_movies.write.format("delta") \
    .mode("overwrite") \
    .save("abfss://destination@datalakesonakshi.dfs.core.windows.net/top_movies")

popular_genres.write.format("delta") \
    .mode("overwrite") \
    .save("abfss://destination@datalakesonakshi.dfs.core.windows.net/popular_genres")

In [0]:
# log the notebook end time
notebook_end_time = time.time()
total_notebook_runtime = notebook_end_time - notebook_start_time

print(f"Elapsed time: {total_notebook_runtime:.2f} seconds")