In [0]:
# =========================================================================
# IMDB DLT Pipeline - Silver to Snowflake Gold Layer
# =========================================================================

import dlt
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

# =========================================================================
# SNOWFLAKE CONNECTION CONFIGURATION
# =========================================================================

SNOWFLAKE_OPTIONS = {
    "sfUrl": "RCVOYBU-ZSC95331.snowflakecomputing.com",
    "sfUser": "IMDB_USER",
    "sfPassword": "IMDB",
    "sfDatabase": "IMDB_DB",
    "sfSchema": "DW",
    "sfWarehouse": "IMDB_WH",
    "sfRole": "IMDB_ROLE"
}

# =========================================================================
# DIM_TITLE_BASICS
# =========================================================================

@dlt.table(
    name="snowflake_dim_title_basics",
    comment="Title basics dimension table in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def dim_title_basics():
    silver_df = spark.read.table("imdb_final_project.silver.silver_title_basics")
    
    title_df = (
        silver_df
        .select(
            "TCONST",
            "TITLE_TYPE",
            "PRIMARY_TITLE",
            "ORIGINAL_TITLE",
            "IS_ADULT",
            "START_YEAR",
            "END_YEAR"
        )
        .filter(col("TCONST").isNotNull())
        .dropDuplicates(["TCONST"])
    )
    
    w = Window.orderBy("TCONST")
    title_df = title_df.withColumn("TITLE_BASICS_SK", row_number().over(w))
    
    final_df = title_df.select(
        col("TITLE_BASICS_SK").cast("int"),
        col("TCONST").alias("TITLE_BASICS_TCONST"),
        col("TITLE_TYPE"),
        col("PRIMARY_TITLE"),
        col("ORIGINAL_TITLE"),
        col("IS_ADULT").cast("int"),
        col("START_YEAR").cast("int"),
        col("END_YEAR").cast("int"),
        lit("PARAM").alias("DI_JOB_ID"),
        current_date().alias("DI_LOAD_DATE")
    )
    
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "DIM_TITLE_BASICS") \
        .mode("overwrite") \
        .save()
    
    return final_df


# =========================================================================
# DIM_GENRE
# =========================================================================

@dlt.table(
    name="snowflake_dim_genre",
    comment="Genre dimension table in Snowflake - exploded from title_basics",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def dim_genre():
    silver_df = spark.read.table("imdb_final_project.silver.silver_title_basics")
    
    genre_df = (
        silver_df
        .select(explode(split(col("GENRES"), ",")).alias("GENRE_NAME"))
        .withColumn("GENRE_NAME", trim(col("GENRE_NAME")))
        .filter(col("GENRE_NAME").isNotNull() & (col("GENRE_NAME") != "") & (col("GENRE_NAME") != "unknown"))
        .dropDuplicates(["GENRE_NAME"])
    )
    
    w = Window.orderBy("GENRE_NAME")
    genre_df = genre_df.withColumn("GENRE_SK", row_number().over(w))
    
    final_df = genre_df.select(
        col("GENRE_SK").cast("int"),
        col("GENRE_NAME"),
        lit("PARAM").alias("DI_JOB_ID"),
        current_date().alias("DI_LOAD_DT")
    )
    
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "DIM_GENRE") \
        .mode("overwrite") \
        .save()
    
    return final_df


# =========================================================================
# DIM_PERSON
# =========================================================================

@dlt.table(
    name="snowflake_dim_person",
    comment="Person dimension table in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def dim_person():
    silver_df = spark.read.table("imdb_final_project.silver.silver_name_basics")
    
    person_df = (
        silver_df
        .select(
            "NCONST",
            "PRIMARY_NAME",
            "BIRTH_YEAR",
            "DEATH_YEAR",
            "IS_ALIVE"
        )
        .filter(col("NCONST").isNotNull())
        .dropDuplicates(["NCONST"])
    )
    
    w = Window.orderBy("NCONST")
    person_df = person_df.withColumn("PERSON_SK", row_number().over(w))
    
    final_df = person_df.select(
        col("PERSON_SK").cast("int"),
        col("NCONST"),
        col("PRIMARY_NAME"),
        col("BIRTH_YEAR").cast("int"),
        col("DEATH_YEAR").cast("int"),
        when(col("IS_ALIVE") == True, 1).otherwise(0).cast("int").alias("IS_ALIVE"),
        lit("PRATHUSH").alias("DI_JOB_ID"),
        current_date().alias("DI_LOAD_DATE")
    )
    
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "DIM_PERSON") \
        .mode("overwrite") \
        .save()
    
    return final_df


# =========================================================================
# DIM_PROFESSION
# =========================================================================

@dlt.table(
    name="snowflake_dim_profession",
    comment="Profession dimension table in Snowflake - exploded from name_basics",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def dim_profession():
    silver_df = spark.read.table("imdb_final_project.silver.silver_name_basics")
    
    profession_df = (
        silver_df
        .select(explode(split(col("PRIMARY_PROFESSION"), ",")).alias("PROFESSION_NAME"))
        .withColumn("PROFESSION_NAME", trim(col("PROFESSION_NAME")))
        .filter(col("PROFESSION_NAME").isNotNull() & (col("PROFESSION_NAME") != "") & (col("PROFESSION_NAME") != "Unknown"))
        .dropDuplicates(["PROFESSION_NAME"])
    )
    
    w = Window.orderBy("PROFESSION_NAME")
    profession_df = profession_df.withColumn("PROFESSION_SK", row_number().over(w))
    
    final_df = profession_df.select(
        col("PROFESSION_SK").cast("int"),
        col("PROFESSION_NAME"),
        lit("NIKHIL").alias("DI_JOB_ID"),
        current_date().alias("DI_LOAD_DATE")
    )
    
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "DIM_PROFESSION") \
        .mode("overwrite") \
        .save()
    
    return final_df


# =========================================================================
# DIM_JOB
# =========================================================================

@dlt.table(
    name="snowflake_dim_job",
    comment="Job category dimension table in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def dim_job():
    silver_df = spark.read.table("imdb_final_project.silver.silver_title_principals")
    
    job_df = (
        silver_df
        .select(col("CATEGORY").alias("JOB_CATEGORY"))
        .filter(col("JOB_CATEGORY").isNotNull() & (col("JOB_CATEGORY") != "unknown"))
        .dropDuplicates(["JOB_CATEGORY"])
    )
    
    w = Window.orderBy("JOB_CATEGORY")
    job_df = job_df.withColumn("JOB_SK", row_number().over(w))
    
    final_df = job_df.select(
        col("JOB_SK").cast("int"),
        col("JOB_CATEGORY"),
        lit("PRATHUSH").alias("DI_JOB_ID"),
        current_date().alias("DI_LOAD_DATE")
    )
    
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "DIM_JOB") \
        .mode("overwrite") \
        .save()
    
    return final_df


# =========================================================================
# DIM_REGION (with Unknown row for -1 SK)
# =========================================================================

@dlt.table(
    name="snowflake_dim_region",
    comment="Region dimension table in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def dim_region():
    silver_df = spark.read.table("imdb_final_project.silver.silver_title_region")
    
    region_df = (
        silver_df
        .select("REGION_CODE", "REGION_NAME")
        .filter(col("REGION_CODE").isNotNull())
        .dropDuplicates(["REGION_CODE"])
    )
    
    w = Window.orderBy("REGION_CODE")
    region_df = region_df.withColumn("REGION_SK", row_number().over(w))
    
    # Create Unknown row for missing/null regions (SK = -1)
    unknown_row = spark.createDataFrame(
        [(-1, "UNK", "Unknown")], 
        ["REGION_SK", "REGION_CODE", "REGION_NAME"]
    )
    
    # Union with existing data
    region_df = region_df.select("REGION_SK", "REGION_CODE", "REGION_NAME").union(unknown_row)
    
    final_df = region_df.select(
        col("REGION_SK").cast("int"),
        col("REGION_CODE"),
        col("REGION_NAME"),
        lit("PARAM").alias("DI_JOB_ID"),
        current_date().alias("DI_LOAD_DT")
    )
    
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "DIM_REGION") \
        .mode("overwrite") \
        .save()
    
    return final_df


# =========================================================================
# DIM_LANGUAGE (with Unknown row for -1 SK)
# =========================================================================

@dlt.table(
    name="snowflake_dim_language",
    comment="Language dimension table in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def dim_language():
    silver_df = spark.read.table("imdb_final_project.silver.silver_title_language_codes")
    
    language_df = (
        silver_df
        .select("LANGUAGE_CODE", "LANGUAGE_NAME")
        .filter(col("LANGUAGE_CODE").isNotNull())
        .dropDuplicates(["LANGUAGE_CODE"])
    )
    
    w = Window.orderBy("LANGUAGE_CODE")
    language_df = language_df.withColumn("LANGUAGE_SK", row_number().over(w))
    
    # Create Unknown row for missing/null languages (SK = -1)
    unknown_row = spark.createDataFrame(
        [(-1, "UNK", "Unknown")], 
        ["LANGUAGE_SK", "LANGUAGE_CODE", "LANGUAGE_NAME"]
    )
    
    # Union with existing data
    language_df = language_df.select("LANGUAGE_SK", "LANGUAGE_CODE", "LANGUAGE_NAME").union(unknown_row)
    
    final_df = language_df.select(
        col("LANGUAGE_SK").cast("int"),
        col("LANGUAGE_CODE"),
        col("LANGUAGE_NAME"),
        lit("PRATHUSH").alias("DI_JOB_ID"),
        current_date().alias("DI_LOAD_DATE")
    )
    
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "DIM_LANGUAGE") \
        .mode("overwrite") \
        .save()
    
    return final_df


# =========================================================================
# DIM_TITLE_AKAS (with coalesce for null FKs to -1)
# =========================================================================

@dlt.table(
    name="snowflake_dim_title_akas",
    comment="Title AKAS dimension table in Snowflake with FK lookups",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def dim_title_akas():
    silver_akas = spark.read.table("imdb_final_project.silver.silver_title_akas")
    silver_basics = spark.read.table("imdb_final_project.silver.silver_title_basics")
    silver_region = spark.read.table("imdb_final_project.silver.silver_title_region")
    silver_language = spark.read.table("imdb_final_project.silver.silver_title_language_codes")
    
    # Title lookup
    title_df = (
        silver_basics
        .select("TCONST")
        .filter(col("TCONST").isNotNull())
        .dropDuplicates(["TCONST"])
    )
    w = Window.orderBy("TCONST")
    title_df = title_df.withColumn("TITLE_BASICS_SK", row_number().over(w))
    
    # Region lookup
    region_df = (
        silver_region
        .select("REGION_CODE")
        .filter(col("REGION_CODE").isNotNull())
        .dropDuplicates(["REGION_CODE"])
    )
    w = Window.orderBy("REGION_CODE")
    region_df = region_df.withColumn("REGION_SK", row_number().over(w))
    
    # Language lookup
    language_df = (
        silver_language
        .select("LANGUAGE_CODE")
        .filter(col("LANGUAGE_CODE").isNotNull())
        .dropDuplicates(["LANGUAGE_CODE"])
    )
    w = Window.orderBy("LANGUAGE_CODE")
    language_df = language_df.withColumn("LANGUAGE_SK", row_number().over(w))
    
    # Join AKAS with lookups
    akas_df = (
        silver_akas
        .join(title_df, silver_akas["TITLE_ID"] == title_df["TCONST"], "left")
        .join(region_df, silver_akas["REGION"] == region_df["REGION_CODE"], "left")
        .join(language_df, silver_akas["LANGUAGE"] == language_df["LANGUAGE_CODE"], "left")
    )
    
    w = Window.orderBy("TITLE_ID", "ORDERING")
    akas_df = akas_df.withColumn("TITLE_AKA_SK", row_number().over(w))
    
    final_df = akas_df.select(
        col("TITLE_AKA_SK").cast("int"),
        col("TITLE_BASICS_SK").cast("int"),
        col("TYPES").alias("TITLE_AKAS_TYPES"),
        col("IS_ORIGINAL_TITLE").cast("int"),
        coalesce(col("REGION_SK"), lit(-1)).cast("int").alias("REGION_SK"),
        coalesce(col("LANGUAGE_SK"), lit(-1)).cast("int").alias("LANGUAGE_SK"),
        lit("NIKHIL").alias("DI_JOB_ID"),
        current_date().alias("DI_LOAD_DATE")
    )
    
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "DIM_TITLE_AKAS") \
        .mode("overwrite") \
        .save()
    
    return final_df


# =========================================================================
# BRIDGE_TITLE_GENRE
# =========================================================================

@dlt.table(
    name="snowflake_bridge_title_genre",
    comment="Bridge table linking titles to genres in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "bridge",
        "target": "snowflake"
    }
)
def bridge_title_genre():
    silver_basics = spark.read.table("imdb_final_project.silver.silver_title_basics")
    
    title_df = (
        silver_basics
        .select("TCONST")
        .filter(col("TCONST").isNotNull())
        .dropDuplicates(["TCONST"])
    )
    w = Window.orderBy("TCONST")
    title_df = title_df.withColumn("TITLE_BASICS_SK", row_number().over(w))
    
    genre_df = (
        silver_basics
        .select(explode(split(col("GENRES"), ",")).alias("GENRE_NAME"))
        .withColumn("GENRE_NAME", trim(col("GENRE_NAME")))
        .filter(col("GENRE_NAME").isNotNull() & (col("GENRE_NAME") != "") & (col("GENRE_NAME") != "unknown"))
        .dropDuplicates(["GENRE_NAME"])
    )
    w = Window.orderBy("GENRE_NAME")
    genre_df = genre_df.withColumn("GENRE_SK", row_number().over(w))
    
    exploded_df = (
        silver_basics
        .select(
            col("TCONST"),
            explode(split(col("GENRES"), ",")).alias("GENRE_NAME")
        )
        .withColumn("GENRE_NAME", trim(col("GENRE_NAME")))
        .filter(col("GENRE_NAME").isNotNull() & (col("GENRE_NAME") != "") & (col("GENRE_NAME") != "unknown"))
    )
    
    bridge_df = (
        exploded_df
        .join(title_df, exploded_df["TCONST"] == title_df["TCONST"], "inner")
        .join(genre_df, exploded_df["GENRE_NAME"] == genre_df["GENRE_NAME"], "inner")
        .select(
            title_df["TITLE_BASICS_SK"],
            genre_df["GENRE_SK"]
        )
        .dropDuplicates(["TITLE_BASICS_SK", "GENRE_SK"])
    )
    
    final_df = bridge_df.select(
        col("TITLE_BASICS_SK").cast("int"),
        col("GENRE_SK").cast("int"),
        lit("PARAM").alias("DI_JOB_ID"),
        current_date().alias("DI_LOAD_DATE")
    )
    
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "BRIDGE_TITLE_GENRE") \
        .mode("overwrite") \
        .save()
    
    return final_df


# =========================================================================
# BRIDGE_PERSON_PROFESSION
# =========================================================================

@dlt.table(
    name="snowflake_bridge_person_profession",
    comment="Bridge table linking persons to professions in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "bridge",
        "target": "snowflake"
    }
)
def bridge_person_profession():
    silver_names = spark.read.table("imdb_final_project.silver.silver_name_basics")
    
    person_df = (
        silver_names
        .select("NCONST")
        .filter(col("NCONST").isNotNull())
        .dropDuplicates(["NCONST"])
    )
    w = Window.orderBy("NCONST")
    person_df = person_df.withColumn("PERSON_SK", row_number().over(w))
    
    profession_df = (
        silver_names
        .select(explode(split(col("PRIMARY_PROFESSION"), ",")).alias("PROFESSION_NAME"))
        .withColumn("PROFESSION_NAME", trim(col("PROFESSION_NAME")))
        .filter(col("PROFESSION_NAME").isNotNull() & (col("PROFESSION_NAME") != "") & (col("PROFESSION_NAME") != "Unknown"))
        .dropDuplicates(["PROFESSION_NAME"])
    )
    w = Window.orderBy("PROFESSION_NAME")
    profession_df = profession_df.withColumn("PROFESSION_SK", row_number().over(w))
    
    exploded_df = (
        silver_names
        .select(
            col("NCONST"),
            explode(split(col("PRIMARY_PROFESSION"), ",")).alias("PROFESSION_NAME")
        )
        .withColumn("PROFESSION_NAME", trim(col("PROFESSION_NAME")))
        .filter(col("PROFESSION_NAME").isNotNull() & (col("PROFESSION_NAME") != "") & (col("PROFESSION_NAME") != "Unknown"))
    )
    
    bridge_df = (
        exploded_df
        .join(person_df, exploded_df["NCONST"] == person_df["NCONST"], "inner")
        .join(profession_df, exploded_df["PROFESSION_NAME"] == profession_df["PROFESSION_NAME"], "inner")
        .select(
            person_df["PERSON_SK"],
            profession_df["PROFESSION_SK"]
        )
        .dropDuplicates(["PERSON_SK", "PROFESSION_SK"])
    )
    
    final_df = bridge_df.select(
        col("PERSON_SK").cast("int"),
        col("PROFESSION_SK").cast("int"),
        lit("NIKHIL").alias("DI_JOB_ID"),
        current_date().alias("DI_LOAD_DATE")
    )
    
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "BRIDGE_PERSON_PROFESSION") \
        .mode("overwrite") \
        .save()
    
    return final_df


# =========================================================================
# FACT_TITLE_PARTICIPATION
# =========================================================================

@dlt.table(
    name="snowflake_fact_title_participation",
    comment="Fact table for title participation in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "fact",
        "target": "snowflake"
    }
)
def fact_title_participation():
    silver_principals = spark.read.table("imdb_final_project.silver.silver_title_principals")
    silver_basics = spark.read.table("imdb_final_project.silver.silver_title_basics")
    silver_names = spark.read.table("imdb_final_project.silver.silver_name_basics")
    
    title_df = (
        silver_basics
        .select("TCONST")
        .filter(col("TCONST").isNotNull())
        .dropDuplicates(["TCONST"])
    )
    w = Window.orderBy("TCONST")
    title_df = title_df.withColumn("TITLE_BASICS_SK", row_number().over(w))
    
    person_df = (
        silver_names
        .select("NCONST")
        .filter(col("NCONST").isNotNull())
        .dropDuplicates(["NCONST"])
    )
    w = Window.orderBy("NCONST")
    person_df = person_df.withColumn("PERSON_SK", row_number().over(w))
    
    job_df = (
        silver_principals
        .select(col("CATEGORY").alias("JOB_CATEGORY"))
        .filter(col("JOB_CATEGORY").isNotNull() & (col("JOB_CATEGORY") != "unknown"))
        .dropDuplicates(["JOB_CATEGORY"])
    )
    w = Window.orderBy("JOB_CATEGORY")
    job_df = job_df.withColumn("JOB_SK", row_number().over(w))
    
    fact_df = (
        silver_principals
        .join(title_df, silver_principals["TCONST"] == title_df["TCONST"], "inner")
        .join(person_df, silver_principals["NCONST"] == person_df["NCONST"], "inner")
        .join(job_df, silver_principals["CATEGORY"] == job_df["JOB_CATEGORY"], "inner")
    )
    
    w = Window.orderBy(silver_principals["TCONST"], silver_principals["NCONST"], silver_principals["ORDERING"])
    fact_df = fact_df.withColumn("PARTICIPATION_SK", row_number().over(w))
    
    final_df = fact_df.select(
        col("PARTICIPATION_SK").cast("int"),
        col("JOB_SK").cast("int"),
        col("TITLE_BASICS_SK").cast("int"),
        col("PERSON_SK").cast("int"),
        silver_principals["ORDERING"].cast("int").alias("ORDERING"),  # ‚Üê ADDED
        lit("PRATHUSH").alias("DI_JOB_ID"),
        current_date().alias("DI_LOAD_DATE")
    )
    
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "FACT_TITLE_PARTICIPATION") \
        .mode("overwrite") \
        .save()
    
    return final_df

# =========================================================================
# FACT_TITLE_STATS
# =========================================================================

@dlt.table(
    name="snowflake_fact_title_stats",
    comment="Fact table for title statistics in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "fact",
        "target": "snowflake"
    }
)
def fact_title_stats():
    silver_ratings = spark.read.table("imdb_final_project.silver.silver_title_ratings")
    silver_basics = spark.read.table("imdb_final_project.silver.silver_title_basics")
    silver_episode = spark.read.table("imdb_final_project.silver.silver_title_episode")
    
    title_df = (
        silver_basics
        .select("TCONST", "RUNTIME_MINUTES")
        .filter(col("TCONST").isNotNull())
        .dropDuplicates(["TCONST"])
    )
    w = Window.orderBy("TCONST")
    title_df = title_df.withColumn("TITLE_BASICS_SK", row_number().over(w))
    
    episode_agg = (
        silver_episode
        .groupBy("PARENTTCONST", "SEASON_NUMBER")
        .agg(count("TCONST").alias("NO_OF_EPISODES"))
    )
    
    fact_df = (
        silver_ratings
        .join(title_df, silver_ratings["TCONST"] == title_df["TCONST"], "inner")
        .join(episode_agg, silver_ratings["TCONST"] == episode_agg["PARENTTCONST"], "left")
    )
    
    w = Window.orderBy(silver_ratings["TCONST"], "SEASON_NUMBER")
    fact_df = fact_df.withColumn("TITLE_STATS_SK", row_number().over(w))
    
    final_df = fact_df.select(
        col("TITLE_STATS_SK").cast("int"),
        col("TITLE_BASICS_SK").cast("int"),
        col("AVERAGE_RATING").cast("decimal(3,1)"),
        col("RATING_CATEGORY"),
        col("NUM_VOTES").cast("int"),
        col("RUNTIME_MINUTES").cast("int"),
        coalesce(col("SEASON_NUMBER"), lit(-1)).cast("int").alias("SEASON_NUMBER"),
        coalesce(col("NO_OF_EPISODES"), lit(-1)).cast("int").alias("NO_OF_EPISODES"),
        lit("NIKHIL").alias("DI_JOB_ID"),
        current_date().alias("DI_LOAD_DATE")
    )
    
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "FACT_TITLE_STATS") \
        .mode("overwrite") \
        .save()
    
    return final_df