In [0]:
import dlt
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# ------------------------------------------------------------
# CONFIG
# ------------------------------------------------------------
JOB_ID = "IMDB_GOLD_LOAD_2025_12_06"

# ------------------------------------------------------------
# Helper readers & metadata
# ------------------------------------------------------------

def read_silver(table_name: str):
    """
    Read a silver table that is ALSO defined as a DLT table
    in this same pipeline (bronze -> silver -> gold).
    Example: read_silver("silver_title_basics")
    """
    return dlt.read(table_name)

def add_metadata(df):
    return (
        df
        .withColumn("JOB_ID", F.lit(JOB_ID))
        .withColumn("LOAD_DT", F.current_date())
    )

# ============================================================
# 1. DIMENSIONS
# ============================================================

# 1.1 DIM_TITLES
@dlt.expect_or_drop("valid_tconst_dim_titles", "tconst IS NOT NULL")
@dlt.table(
    name="DIM_TITLES",
    comment="Title dimension derived from silver_title_basics (Type 1)."
)
def dim_titles():
    df = read_silver("silver_title_basics").where(F.col("tconst").isNotNull())

    w = Window.orderBy("tconst")

    dim = (
        df
        .withColumn("TITLE_SK", F.row_number().over(w))
        .withColumn(
            "IS_ADULT",
            F.when(F.col("isAdult").cast("int") == 1, F.lit(1)).otherwise(F.lit(0))
        )
        .select(
            "TITLE_SK",
            F.col("tconst").alias("TCONST"),
            F.col("primaryTitle").alias("PRIMARY_TITLE"),
            F.col("originalTitle").alias("ORIGINAL_TITLE"),
            F.col("titleType").alias("TITLE_TYPE"),
            "IS_ADULT"
        )
    )

    return add_metadata(dim)


# 1.2 DIM_PERSONS
@dlt.expect_or_drop("valid_nconst_dim_persons", "nconst IS NOT NULL")
@dlt.table(
    name="DIM_PERSONS",
    comment="Person dimension (Type 2) from silver_name_basics."
)
def dim_persons():
    df = read_silver("silver_name_basics").where(F.col("nconst").isNotNull())

    w = Window.orderBy("nconst")

    dim = (
        df
        .withColumn("PERSON_SK", F.row_number().over(w))
        .select(
            "PERSON_SK",
            F.col("nconst").alias("NCONST"),
            F.col("primaryName").alias("PRIMARY_NAME"),
            F.col("birthYear").cast("int").alias("BIRTH_YEAR"),
            F.col("deathYear").cast("int").alias("DEATH_YEAR"),
            F.col("knownForTitles").alias("KNOWN_TITLE")
        )
        .withColumn("IS_CURRENT", F.lit(1))
        .withColumn("START_DATE", F.current_date())
        .withColumn("END_DATE", F.lit(None).cast("date"))
        .withColumn("PERSON_DK", F.col("NCONST"))
    )

    return add_metadata(dim)


# 1.3 DIM_GENRES
@dlt.table(
    name="DIM_GENRES",
    comment="Genre dimension derived from exploded genres."
)
def dim_genres():
    df = read_silver("silver_title_basics")

    df_genres_flat = (
        df
        .where(F.col("genres").isNotNull())
        .withColumn(
            "genres_arr",
            F.split(
                F.regexp_replace(F.col("genres").cast("string"), r'[\[\]\"]', ''),
                ','
            )
        )
        .select(F.explode(F.col("genres_arr")).alias("GENRE_NAME_RAW"))
        .withColumn("GENRE_NAME", F.trim(F.col("GENRE_NAME_RAW")))
        .where(F.col("GENRE_NAME") != "")
        .select("GENRE_NAME")
        .distinct()
    )

    w = Window.orderBy("GENRE_NAME")

    dim = (
        df_genres_flat
        .withColumn("GENRE_SK", F.row_number().over(w))
        .select("GENRE_SK", "GENRE_NAME")
    )

    return add_metadata(dim)


# 1.4 DIM_PROFESSION
@dlt.table(
    name="DIM_PROFESSION",
    comment="Profession dimension based on primaryProfession."
)
def dim_profession():
    df = read_silver("silver_name_basics")

    df_prof_flat = (
        df
        .where(F.col("primaryProfession").isNotNull())
        .withColumn(
            "profession_arr",
            F.split(
                F.regexp_replace(F.col("primaryProfession").cast("string"), r'[\[\]\"]', ''),
                ','
            )
        )
        .select(F.explode(F.col("profession_arr")).alias("PROFESSION_RAW"))
        .withColumn("PROFESSION_NAME", F.trim(F.col("PROFESSION_RAW")))
        .where(F.col("PROFESSION_NAME") != "")
        .select("PROFESSION_NAME")
        .distinct()
    )

    w = Window.orderBy("PROFESSION_NAME")

    dim = (
        df_prof_flat
        .withColumn("PROFESSION_SK", F.row_number().over(w))
        .select("PROFESSION_SK", "PROFESSION_NAME")
        .withColumn("IS_CURRENT", F.lit(1))
        .withColumn("START_DATE", F.current_date())
        .withColumn("END_DATE", F.lit(None).cast("date"))
        .withColumn("PROFESSION_DK", F.col("PROFESSION_NAME"))
    )

    return add_metadata(dim)


# 1.5 DIM_REGIONS
@dlt.table(
    name="DIM_REGIONS",
    comment="Region dimension derived from silver_title_akas.region."
)
def dim_regions():
    df_akas = read_silver("silver_title_akas")

    df_regions_flat = (
        df_akas
        .where(F.col("region").isNotNull() & (F.col("region") != ""))
        .select(F.col("region").alias("REGION_CODE"))
        .distinct()
    )

    w = Window.orderBy("REGION_CODE")

    dim = (
        df_regions_flat
        .withColumn("REGION_SK", F.row_number().over(w))
        .select(
            "REGION_SK",
            F.col("REGION_CODE").alias("REGION_ABBR"),
            F.col("REGION_CODE").alias("REGION_NAME")
        )
    )

    return add_metadata(dim)


# 1.6 DIM_LANGUAGES
@dlt.table(
    name="DIM_LANGUAGES",
    comment="Language dimension derived from silver_title_akas.language."
)
def dim_languages():
    df_akas = read_silver("silver_title_akas")

    df_lang_flat = (
        df_akas
        .where(F.col("language").isNotNull() & (F.col("language") != ""))
        .select(F.col("language").alias("LANGUAGE_CODE"))
        .distinct()
    )

    w = Window.orderBy("LANGUAGE_CODE")

    dim = (
        df_lang_flat
        .withColumn("LANGUAGE_SK", F.row_number().over(w))
        .select(
            "LANGUAGE_SK",
            F.col("LANGUAGE_CODE").alias("LANGUAGE_ABBR"),
            F.col("LANGUAGE_CODE").alias("LANGUAGE_NAME")
        )
    )

    return add_metadata(dim)


# ============================================================
# 2. BRIDGE TABLES
# ============================================================

@dlt.table(
    name="BRIDGE_TITLE_GENRES",
    comment="Bridge between titles and genres (many-to-many)."
)
def bridge_title_genres():
    df_titles = dlt.read("DIM_TITLES").alias("dt")
    df_genres = dlt.read("DIM_GENRES").alias("g")
    df_basics = read_silver("silver_title_basics").alias("b")

    bridge = (
        df_basics
        .where(F.col("b.tconst").isNotNull() & F.col("b.genres").isNotNull())
        .join(df_titles, F.col("b.tconst") == F.col("dt.TCONST"), "inner")
        .withColumn(
            "genres_arr",
            F.split(
                F.regexp_replace(F.col("b.genres").cast("string"), r'[\[\]\"]', ''),
                ','
            )
        )
        .withColumn("GENRE_NAME", F.explode(F.col("genres_arr")))
        .withColumn("GENRE_NAME", F.trim(F.col("GENRE_NAME")))
        .join(df_genres, "GENRE_NAME", "inner")
        .select(
            F.col("dt.TITLE_SK").alias("TITLE_SK"),
            F.col("g.GENRE_SK").alias("GENRE_SK")
        )
        .distinct()
    )

    return add_metadata(bridge)


@dlt.table(
    name="BRIDGE_TITLE_AKAS",
    comment="AKAs per title, including region and language SKs."
)
def bridge_title_akas():
    df_akas    = read_silver("silver_title_akas").alias("a")
    df_titles  = dlt.read("DIM_TITLES").alias("dt")
    df_regions = dlt.read("DIM_REGIONS").alias("dr")
    df_langs   = dlt.read("DIM_LANGUAGES").alias("dl")

    joined = (
        df_akas
        .where(F.col("a.titleId").isNotNull())
        .join(df_titles, F.col("a.titleId") == F.col("dt.TCONST"), "inner")
        .join(df_regions, F.col("a.region")   == F.col("dr.REGION_ABBR"),  "left")
        .join(df_langs,  F.col("a.language") == F.col("dl.LANGUAGE_ABBR"), "left")
    )

    bridge = (
        joined
        .where(F.col("dr.REGION_SK").isNotNull() | F.col("dl.LANGUAGE_SK").isNotNull())
        .select(
            F.col("dt.TITLE_SK").alias("TITLE_SK"),
            F.col("dr.REGION_SK").alias("REGION_SK"),
            F.col("dl.LANGUAGE_SK").alias("LANGUAGE_SK"),
            F.col("a.title").alias("AKA_TITLE"),
            F.when(F.col("a.isOriginalTitle") == True, F.lit(1)).otherwise(F.lit(0)).alias("IS_ORIGINAL_TITLE")
        )
    )

    return add_metadata(bridge)


@dlt.table(
    name="BRIDGE_TITLE_REGIONS",
    comment="Bridge between titles and regions."
)
def bridge_title_regions():
    df = dlt.read("BRIDGE_TITLE_AKAS")
    bridge = (
        df
        .where(F.col("REGION_SK").isNotNull())
        .select("TITLE_SK", "REGION_SK")
        .distinct()
    )
    return add_metadata(bridge)


@dlt.table(
    name="BRIDGE_TITLE_LANGUAGE",
    comment="Bridge between titles and languages."
)
def bridge_title_language():
    df = dlt.read("BRIDGE_TITLE_AKAS")
    bridge = (
        df
        .where(F.col("LANGUAGE_SK").isNotNull())
        .select("TITLE_SK", "LANGUAGE_SK")
        .distinct()
    )
    return add_metadata(bridge)


@dlt.table(
    name="BRIDGE_PERSONS_PROFESSION",
    comment="Bridge between persons and professions (multi-valued)."
)
def bridge_persons_profession():
    df_name        = read_silver("silver_name_basics").alias("n")
    df_dim_persons = dlt.read("DIM_PERSONS").alias("dp")
    df_dim_prof    = dlt.read("DIM_PROFESSION").alias("pr")

    pp = (
        df_name
        .where(F.col("n.nconst").isNotNull() & F.col("n.primaryProfession").isNotNull())
        .join(df_dim_persons, F.col("n.nconst") == F.col("dp.NCONST"), "inner")
        .withColumn(
            "profession_arr",
            F.split(
                F.regexp_replace(F.col("n.primaryProfession").cast("string"), r'[\[\]\"]', ''),
                ','
            )
        )
        .withColumn("PROFESSION_NAME", F.explode(F.col("profession_arr")))
        .withColumn("PROFESSION_NAME", F.trim(F.col("PROFESSION_NAME")))
    )

    bridge = (
        pp
        .join(df_dim_prof, "PROFESSION_NAME", "inner")
        .select(
            F.col("dp.PERSON_SK").alias("PERSON_SK"),
            F.col("pr.PROFESSION_SK").alias("PROFESSION_SK"),
            F.lit(1).alias("IS_CURRENT"),
            F.current_date().alias("START_DATE"),
            F.lit(None).cast("date").alias("END_DATE"),
            F.concat_ws("-", F.col("dp.PERSON_SK"), F.col("pr.PROFESSION_SK")).alias("PERSON_PROFESSION_DK")
        )
        .distinct()
    )

    return add_metadata(bridge)


# ============================================================
# 3. FACT TABLES
# ============================================================

@dlt.table(
    name="FACT_MOVIES",
    comment="Movie fact (one row per title) with runtime, adult flag and ratings."
)
def fact_movies():
    b  = read_silver("silver_title_basics").alias("b")
    dt = dlt.read("DIM_TITLES").alias("dt")
    r  = read_silver("silver_title_ratings").alias("r")

    base = (
        b
        .where(F.col("b.tconst").isNotNull())
        .join(dt, F.col("b.tconst") == F.col("dt.TCONST"), "inner")
        .join(r,  F.col("b.tconst") == F.col("r.tconst"), "left")
    )

    w = Window.orderBy("dt.TITLE_SK")

    fact = (
        base
        .withColumn("MOVIE_SK", F.row_number().over(w))
        .select(
            "MOVIE_SK",
            F.col("dt.TITLE_SK").alias("TITLE_SK"),
            F.col("b.startYear").cast("int").alias("START_YEAR"),
            F.col("b.endYear").cast("int").alias("END_YEAR"),
            F.col("b.runtimeMinutes").cast("int").alias("RUNTIME_MINUTES"),
            F.col("dt.IS_ADULT").alias("IS_ADULT"),
            F.col("r.averageRating").alias("AVERAGE_RATING"),
            F.col("r.numVotes").alias("NUM_VOTES")
        )
    )

    return add_metadata(fact)


@dlt.table(
    name="FACT_EPISODES",
    comment="Episode fact linking episode and parent series titles."
)
def fact_episodes():
    df_episode_raw = read_silver("silver_title_episode")

    df_episode = (
        df_episode_raw
        .where(F.col("tconst").isNotNull() & F.col("parentTconst").isNotNull())
        .withColumnRenamed("tconst", "EPISODE_TCONST")
    )

    df_titles = dlt.read("DIM_TITLES")

    w_ep = Window.orderBy("EPISODE_TCONST")

    fact = (
        df_episode.alias("e")
        .join(df_titles.alias("child"),
              F.col("e.EPISODE_TCONST") == F.col("child.TCONST"), "inner")
        .join(df_titles.alias("parent"),
              F.col("e.parentTconst") == F.col("parent.TCONST"), "inner")
        .withColumn("EPISODE_SK", F.row_number().over(w_ep))
        .select(
            "EPISODE_SK",
            F.col("e.seasonNumber").alias("SEASON_NUMBER"),
            F.col("e.episodeNumber").alias("EPISODE_NUMBER"),
            F.lit(1).alias("NUM_EPISODES"),
            F.col("child.TITLE_SK").alias("TITLE_SK"),
            F.col("parent.TITLE_SK").alias("PARENT_TITLE_SK")
        )
    )

    return add_metadata(fact)


@dlt.table(
    name="FACT_PERSONS",
    comment="Fact table for cast/crew per title and person."
)
def fact_persons():
    p  = read_silver("silver_title_principals").alias("p")
    dt = dlt.read("DIM_TITLES").alias("dt")
    dp = dlt.read("DIM_PERSONS").alias("dp")

    joined = (
        p
        .where(F.col("p.tconst").isNotNull() & F.col("p.nconst").isNotNull())
        .join(dt, F.col("p.tconst") == F.col("dt.TCONST"), "inner")
        .join(dp, F.col("p.nconst") == F.col("dp.NCONST"), "inner")
        .withColumn(
            "JOB_CLEAN",
            F.when(
                F.trim(F.col("p.job")).isin("\\N", "N", "null", ""),
                None
            ).otherwise(F.trim(F.col("p.job")))
        )
        .withColumn(
            "CHARACTER_CLEAN",
            F.when(
                F.col("p.characters").isNull() |
                (F.trim(F.col("p.characters")) == "") |
                F.col("p.characters").isin("\\N", "N", "null"),
                None
            ).otherwise(F.col("p.characters"))
        )
    )

    w_fp = Window.orderBy("p.tconst", "p.nconst", "p.ordering")

    fact = (
        joined
        .withColumn("FACT_PERSON_SK", F.row_number().over(w_fp))
        .select(
            "FACT_PERSON_SK",
            F.col("dp.PERSON_SK").alias("PERSON_SK"),
            F.col("dt.TITLE_SK").alias("TITLE_SK"),
            F.col("p.category").alias("CATEGORY"),
            F.col("JOB_CLEAN").alias("JOB"),
            F.col("CHARACTER_CLEAN").alias("CHARACTER"),
            F.col("p.ordering").cast("int").alias("ORDERING")
        )
    )

    return add_metadata(fact)
