In [0]:
from pyspark.sql.functions import col, split, from_unixtime, trim, collect_set, transform, lower
from pyspark.sql.types import IntegerType, FloatType, LongType

# ratings : type casting + deduplication + timestamp
df_ratings = spark.table("workspace.bronze.ratings")

df_ratings_clean = (
    df_ratings
    .withColumn("userId", col("userId").cast(IntegerType()))
    .withColumn("movieId", col("movieId").cast(IntegerType()))
    .withColumn("rating", col("rating").cast(FloatType()))
    .withColumn("timestamp", from_unixtime(col("timestamp").cast(LongType())).cast("timestamp"))
    .dropDuplicates(["userId", "movieId"])  # SCD Type 1: keep latest per user-movie pair
    .filter(col("userId").isNotNull() & col("movieId").isNotNull())
)

df_ratings_clean.write.format("delta").mode("overwrite").saveAsTable("workspace.silver.ratings")
print(f"✅ Silver ratings: {spark.table('workspace.silver.ratings').count()} rows")

# movies : type casting + genre parsing into array + deduplication
df_movies = spark.table("workspace.bronze.movies")

df_movies_clean = (
    df_movies
    .withColumn("movieId", col("movieId").cast(IntegerType()))
    .withColumn("title", trim(col("title")))
    .withColumn("genres_array", transform(split(col("genres"), "\\|"), lambda x: trim(x)))
    .filter(col("movieId").isNotNull())
    .dropDuplicates(["movieId"])
)

df_movies_clean.write.format("delta").mode("overwrite").saveAsTable("workspace.silver.movies")
print(f"✅ Silver movies: {spark.table('workspace.silver.movies').count()} rows")

# links : type casting + deduplication 
df_links = spark.table("workspace.bronze.links")

df_links_clean = (
    df_links
    .withColumn("movieId", col("movieId").cast(IntegerType()))
    .withColumn("imdbId", col("imdbId").cast(IntegerType()))
    .withColumn("tmdbId", col("tmdbId").cast(IntegerType()))
    .filter(col("movieId").isNotNull())
    .dropDuplicates(["movieId"])
)

df_links_clean.write.format("delta").mode("overwrite").saveAsTable("workspace.silver.links")
print(f"✅ Silver links: {spark.table('workspace.silver.links').count()} rows")

#tags : type casting + deduplication + timestamp
df_tags = spark.table("workspace.bronze.tags")

df_tags_clean = (
    df_tags
    .withColumn("userId", col("userId").cast(IntegerType()))
    .withColumn("movieId", col("movieId").cast(IntegerType()))
    .withColumn("tag", trim(lower(col("tag"))))
    .withColumn("timestamp", from_unixtime(col("timestamp").cast(LongType())).cast("timestamp"))
    .filter(col("tag").isNotNull())
    .filter(col("tag") != "")
)

df_tags_silver = (
    df_tags_clean
    .dropDuplicates(["userId", "movieId", "tag"])
)

df_tags_silver.write.format("delta").mode("overwrite").saveAsTable("workspace.silver.tags")
print(f"✅ Silver tags: {spark.table('workspace.silver.tags').count()} rows")

# prep tags for movie join
df_movie_tags = (
    spark.table("workspace.silver.tags")
    .withColumn("tag", trim(lower(col("tag"))))
    .filter(col("tag").isNotNull())
    .dropDuplicates(["movieId", "tag"])
    .groupBy("movieId")
    .agg(collect_set("tag").alias("tags_array"))    # collect_set already ensures uniqueness
)

# left join : movies + ratings + tags
df_movie_master = (
    spark.table("workspace.silver.ratings")
    .drop("ingestion_timestamp", "source")
    .join(
        spark.table("workspace.silver.movies")
        .drop("ingestion_timestamp", "source"), 
        on="movieId", 
        how="left")
    .join(df_movie_tags.drop("ingestion_timestamp", "source"), on="movieId", how="left")
)

df_movie_master.write.format("delta").mode("overwrite").saveAsTable("workspace.silver.movies_enriched")
print(f"✅ Silver movies_enriched: {spark.table('workspace.silver.movies_enriched').count()} rows")

✅ Silver ratings: 32000204 rows
✅ Silver movies: 87585 rows
✅ Silver links: 87585 rows
✅ Silver tags: 2000066 rows
✅ Silver movies_enriched: 32000204 rows


In [0]:
###### Health check #########

from pyspark.sql.functions import col, sum as spark_sum

# --- Null checks for all Silver tables ---
for table_name in ["ratings", "movies", "links", "tags"]:
    print(f"\n--- Null Check: Silver {table_name} ---")
    df = spark.table(f"workspace.silver.{table_name}")
    df.select(
        [spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]
    ).show()

# --- Row counts ---
print("\n--- Row Counts ---")
for table_name in ["ratings", "movies", "links", "tags"]:
    count = spark.table(f"workspace.silver.{table_name}").count()
    print(f"  Silver {table_name}: {count}")

# --- Orphan checks ---
# Ratings for movies that don't exist in movies table
orphan_ratings = (
    spark.table("workspace.silver.ratings").alias("r")
    .join(spark.table("workspace.silver.movies").alias("m"), 
          col("r.movieId") == col("m.movieId"), "left_anti")
)
print(f"\n⚠️ Ratings with no matching movie: {orphan_ratings.count()}")

# Tags for movies that don't exist in movies table
orphan_tags = (
    spark.table("workspace.silver.tags").alias("t")
    .join(spark.table("workspace.silver.movies").alias("m"),
          col("t.movieId") == col("m.movieId"), "left_anti")
)
print(f"⚠️ Tags with no matching movie: {orphan_tags.count()}")

# Movies with no ratings at all
movies_no_ratings = (
    spark.table("workspace.silver.movies").alias("m")
    .join(spark.table("workspace.silver.ratings").alias("r"),
          col("m.movieId") == col("r.movieId"), "left_anti")
)
print(f"⚠️ Movies with zero ratings: {movies_no_ratings.count()}")

# Links with no matching movie
orphan_links = (
    spark.table("workspace.silver.links").alias("l")
    .join(spark.table("workspace.silver.movies").alias("m"),
          col("l.movieId") == col("m.movieId"), "left_anti")
)
print(f"⚠️ Links with no matching movie: {orphan_links.count()}")


--- Null Check: Silver ratings ---
+------+-------+------+---------+-------------------+------+
|userId|movieId|rating|timestamp|ingestion_timestamp|source|
+------+-------+------+---------+-------------------+------+
|     0|      0|     0|        0|                  0|     0|
+------+-------+------+---------+-------------------+------+


--- Null Check: Silver movies ---
+-------+-----+------+-------------------+------+------------+
|movieId|title|genres|ingestion_timestamp|source|genres_array|
+-------+-----+------+-------------------+------+------------+
|      0|    0|     0|                  0|     0|           0|
+-------+-----+------+-------------------+------+------------+


--- Null Check: Silver links ---
+-------+------+------+-------------------+------+
|movieId|imdbId|tmdbId|ingestion_timestamp|source|
+-------+------+------+-------------------+------+
|      0|     0|   124|                  0|     0|
+-------+------+------+-------------------+------+


--- Null Check: 