In [0]:
import dlt
from pyspark.sql.types import *
from pyspark.sql.functions import *

# COMMAND ----------

# Configuration
VOLUME_PATH = "/Volumes/workspace/imdb_final_project/imdb"

# COMMAND ----------

# MAGIC %md
# MAGIC ## Bronze Layer - Raw Data Ingestion

# COMMAND ----------

# Bronze: name_basics
@dlt.table(
    name="bronze_name_basics",
    comment="Bronze layer - Raw name basics data from CSV",
    table_properties={
        "quality": "bronze",
        "pipelines.autoOptimize.managed": "true"
    }
)
def bronze_name_basics():
    schema = StructType([
        StructField("nconst", StringType(), True),
        StructField("primaryName", StringType(), True),
        StructField("birthYear", StringType(), True),
        StructField("deathYear", StringType(), True),
        StructField("primaryProfession", StringType(), True),
        StructField("knownForTitles", StringType(), True)
    ])
    
    return (
        spark.read
        .format("csv")
        .schema(schema)
        .option("header", "true")
        .load(f"{VOLUME_PATH}/Cleaned_name_basics.csv")
        .withColumn("ingestion_timestamp", current_timestamp())
    )

# COMMAND ----------

# Bronze: title_basics
@dlt.table(
    name="bronze_title_basics",
    comment="Bronze layer - Raw title basics data from CSV"
)
def bronze_title_basics():
    schema = StructType([
        StructField("tconst", StringType(), True),
        StructField("titleType", StringType(), True),
        StructField("primaryTitle", StringType(), True),
        StructField("originalTitle", StringType(), True),
        StructField("isAdult", StringType(), True),
        StructField("startYear", StringType(), True),
        StructField("endYear", StringType(), True),
        StructField("runtimeMinutes", StringType(), True),
        StructField("genres", StringType(), True)
    ])
    
    return (
        spark.read
        .format("csv")
        .schema(schema)
        .option("header", "true")
        .load(f"{VOLUME_PATH}/Cleaned_title_basics.csv")
        .withColumn("ingestion_timestamp", current_timestamp())
    )

# COMMAND ----------

# Bronze: title_akas
@dlt.table(  # ← Missing @ symbol
    name="bronze_title_akas",
    comment="Bronze layer - Raw title akas data from CSV"
)
def bronze_title_akas():
    schema = StructType([
        StructField("titleId", StringType(), True),
        StructField("ordering", StringType(), True),
        StructField("title", StringType(), True),
        StructField("region", StringType(), True),
        StructField("language", StringType(), True),
        StructField("types", StringType(), True),
        StructField("attributes", StringType(), True),
        StructField("isOriginalTitle", StringType(), True)
    ])
    
    return (
        spark.read
        .format("csv")
        .schema(schema)
        .option("header", "true")
        .option("encoding", "UTF-8")
        .option("charset", "UTF-8")
        .option("multiLine", "true")
        .option("mode", "PERMISSIVE")
        .option("escape", "\"")  # ← Removed duplicate, kept double-quote
        .option("quote", "\"")   # ← Removed duplicate
        .option("ignoreLeadingWhiteSpace", "false")
        .option("ignoreTrailingWhiteSpace", "false")
        .option("emptyValue", "unknown")
        .option("nullValue", "\\N")
        .load(f"{VOLUME_PATH}/Cleaned_title_akas.csv")
        .withColumn("ingestion_timestamp", current_timestamp())
    )

# COMMAND ----------

# Bronze: title_crew
@dlt.table(
    name="bronze_title_crew",
    comment="Bronze layer - Raw title crew data from CSV"
)
def bronze_title_crew():
    schema = StructType([
        StructField("tconst", StringType(), True),
        StructField("directors", StringType(), True),
        StructField("writers", StringType(), True)
    ])
    
    return (
        spark.read
        .format("csv")
        .schema(schema)
        .option("header", "true")
        .load(f"{VOLUME_PATH}/Cleaned_title_crew.csv")
        .withColumn("ingestion_timestamp", current_timestamp())
    )

# COMMAND ----------

# Bronze: title_episode
@dlt.table(
    name="bronze_title_episode",
    comment="Bronze layer - Raw title episode data from CSV"
)
def bronze_title_episode():
    schema = StructType([
        StructField("tconst", StringType(), True),
        StructField("parentTconst", StringType(), True),
        StructField("seasonNumber", StringType(), True),
        StructField("episodeNumber", StringType(), True)
    ])
    
    return (
        spark.read
        .format("csv")
        .schema(schema)
        .option("header", "true")
        .load(f"{VOLUME_PATH}/Cleaned_title_episode.csv")
        .withColumn("ingestion_timestamp", current_timestamp())
    )

# COMMAND ----------

# Bronze: title_principals
@dlt.table(
    name="bronze_title_principals",
    comment="Bronze layer - Raw title principals data from CSV"
)
def bronze_title_principals():
    schema = StructType([
        StructField("tconst", StringType(), True),
        StructField("ordering", StringType(), True),
        StructField("nconst", StringType(), True),
        StructField("category", StringType(), True),
        StructField("job", StringType(), True),
        StructField("characters", StringType(), True)
    ])
    
    return (
        spark.read
        .format("csv")
        .schema(schema)
        .option("header", "true")
        .load(f"{VOLUME_PATH}/Cleaned_title_principals.csv")
        .withColumn("ingestion_timestamp", current_timestamp())
    )

# COMMAND ----------

# Bronze: title_ratings
@dlt.table(
    name="bronze_title_ratings",
    comment="Bronze layer - Raw title ratings data from CSV"
)
def bronze_title_ratings():
    schema = StructType([
        StructField("tconst", StringType(), True),
        StructField("averageRating", StringType(), True),
        StructField("numVotes", StringType(), True)
    ])
    
    return (
        spark.read
        .format("csv")
        .schema(schema)
        .option("header", "true")
        .load(f"{VOLUME_PATH}/Cleaned_title_ratings.csv")
        .withColumn("ingestion_timestamp", current_timestamp())
    )

In [0]:
# Databricks notebook source
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

# ============================================================
#                   SILVER LAYER
# ============================================================


# ------------------------------------------------------------
# Silver: name_basics
# ------------------------------------------------------------
@dlt.table(
    name="silver_name_basics",
    comment="Silver layer - Datatype mapping only"
)
@dlt.expect_or_drop("valid_nconst", "nconst IS NOT NULL")
def silver_name_basics():
    return (
        dlt.read("bronze_name_basics")
        .withColumn("birthYear", col("birthYear").cast("int"))
        .withColumn("deathYear", col("deathYear").cast("int"))
        .withColumn("primaryName", col("primaryName").cast("string"))
        .withColumn("primaryProfession", col("primaryProfession").cast("string"))
        .withColumn("knownForTitles", col("knownForTitles").cast("string"))
    )


# ------------------------------------------------------------
# Silver: title_basics
# ------------------------------------------------------------
@dlt.table(
    name="silver_title_basics",
    comment="Silver layer - Datatype mapping only"
)
@dlt.expect_or_drop("valid_tconst", "tconst IS NOT NULL")
def silver_title_basics():
    return (
        dlt.read("bronze_title_basics")
        .withColumn("titleType", col("titleType").cast("string"))
        .withColumn("primaryTitle", col("primaryTitle").cast("string"))
        .withColumn("originalTitle", col("originalTitle").cast("string"))
        .withColumn("isAdult", col("isAdult").cast("int"))
        .withColumn("startYear", col("startYear").cast("int"))
        .withColumn("endYear", col("endYear").cast("int"))
        .withColumn("runtimeMinutes", col("runtimeMinutes").cast("int"))
        .withColumn("genres", col("genres").cast("string"))
    )


# ------------------------------------------------------------
# Silver: title_akas
# ------------------------------------------------------------
@dlt.table(
    name="silver_title_akas",
    comment="Silver layer - Datatype mapping only"
)
@dlt.expect_or_drop("valid_titleId", "titleId IS NOT NULL")
def silver_title_akas():
    return (
        dlt.read("bronze_title_akas")
        .withColumn("ordering", col("ordering").cast("int"))
        .withColumn("title", col("title").cast("string"))
        .withColumn("region", col("region").cast("string"))
        .withColumn("language", col("language").cast("string"))
        .withColumn("types", col("types").cast("string"))
        .withColumn("attributes", col("attributes").cast("string"))
        .withColumn("isOriginalTitle", col("isOriginalTitle").cast("int"))
    )


# ------------------------------------------------------------
# Silver: title_episode
# ------------------------------------------------------------
@dlt.table(
    name="silver_title_episode",
    comment="Silver layer - Datatype mapping only"
)
@dlt.expect_or_drop("valid_tconst", "tconst IS NOT NULL")
def silver_title_episode():
    return (
        dlt.read("bronze_title_episode")
        .withColumn("parentTconst", col("parentTconst").cast("string"))
        .withColumn("seasonNumber", col("seasonNumber").cast("int"))
        .withColumn("episodeNumber", col("episodeNumber").cast("int"))
    )


# ------------------------------------------------------------
# Silver: title_principals
# ------------------------------------------------------------
@dlt.table(
    name="silver_title_principals",
    comment="Silver layer - Datatype mapping only"
)
@dlt.expect_or_drop("valid_tconst", "tconst IS NOT NULL")
@dlt.expect_or_drop("valid_nconst", "nconst IS NOT NULL")
def silver_title_principals():
    return (
        dlt.read("bronze_title_principals")
        .withColumn("ordering", col("ordering").cast("long"))
        .withColumn("category", col("category").cast("string"))
        .withColumn("job", col("job").cast("string"))
        .withColumn("characters", col("characters").cast("string"))
    )


# ------------------------------------------------------------
# Silver: title_ratings
# ------------------------------------------------------------
@dlt.table(
    name="silver_title_ratings",
    comment="Silver layer - Datatype mapping only"
)
@dlt.expect_or_drop("valid_tconst", "tconst IS NOT NULL")
def silver_title_ratings():
    return (
        dlt.read("bronze_title_ratings")
        .withColumn("averageRating", col("averageRating").cast("float"))
        .withColumn("numVotes", col("numVotes").cast("int"))
    )


# ------------------------------------------------------------
# Silver: title_crew
# ------------------------------------------------------------
@dlt.table(
    name="silver_title_crew",
    comment="Silver layer - Datatype mapping only"
)
@dlt.expect_or_drop("valid_tconst", "tconst IS NOT NULL")
def silver_title_crew():
    return (
        dlt.read("bronze_title_crew")
        .withColumn("directors", col("directors").cast("string"))
        .withColumn("writers", col("writers").cast("string"))
    )


In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC # Gold Layer - Complete Dimensional Model
# MAGIC 
# MAGIC **Architecture:** Medallion Architecture - Gold Layer
# MAGIC **Purpose:** Business-ready dimensional model (Star Schema)
# MAGIC **Reference Data:** Uses static CSV files for Region and Language dimensions

# COMMAND ----------

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window
import pandas as pd

print("All libraries imported successfully")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Configuration

# COMMAND ----------

# Volume path configuration
VOLUME_PATH = "/Volumes/workspace/imdb_final_project/imdb"
REGION_REFERENCE_FILE = f"{VOLUME_PATH}/region_reference.csv"
LANGUAGE_REFERENCE_FILE = f"{VOLUME_PATH}/language_reference.csv"

print(f"Volume Path: {VOLUME_PATH}")
print(f"Region Reference: {REGION_REFERENCE_FILE}")
print(f"Language Reference: {LANGUAGE_REFERENCE_FILE}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Helper Functions for Reference Data

# COMMAND ----------

def load_region_reference():
    """
    Load region reference data from CSV file
    File: /Volumes/workspace/imdb_final_project/imdb/region_reference.csv
    Expected columns: RegionCode, RegionDescription
    """
    region_df = (
        spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(REGION_REFERENCE_FILE)
    )
    
    row_count = region_df.count()
    print(f"Loaded {row_count} regions from reference file")
    
    return region_df

def load_language_reference():
    """
    Load language reference data from CSV file
    File: /Volumes/workspace/imdb_final_project/imdb/language_reference.csv
    Expected columns: LanguageCode, LanguageDescription
    """
    language_df = (
        spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(LANGUAGE_REFERENCE_FILE)
    )
    
    row_count = language_df.count()
    print(f"Loaded {row_count} languages from reference file")
    
    return language_df

print("Helper functions loaded successfully")

# COMMAND ----------

# MAGIC %md
# MAGIC ## DIMENSION TABLES

# COMMAND ----------

# MAGIC %md
# MAGIC ### 1. DIM_Region

# COMMAND ----------

@dlt.table(
    name="gold_DIM_Region",
    comment="Complete Region dimension based only on reference list"
)
def gold_DIM_Region():

    reference_regions = (
        spark.read.format("csv")
        .option("header", "true")
        .load("/Volumes/workspace/imdb_final_project/imdb/region_reference.csv")
        .select(
            lower(col("RegionCode")).alias("RegionCode"),
            col("RegionDescription")
        )
    )

    final_df = (
        reference_regions
        .withColumn("RegionKey", row_number().over(Window.orderBy("RegionCode")))
        .select("RegionKey", "RegionCode", "RegionDescription")
    )

    return final_df



# COMMAND ----------

# MAGIC %md
# MAGIC ### 2. DIM_Language

# COMMAND ----------

@dlt.table(
    name="gold_DIM_Language",
    comment="Complete Language dimension based only on reference list"
)
def gold_DIM_Language():

    reference_languages = (
        spark.read.format("csv")
        .option("header", "true")
        .load("/Volumes/workspace/imdb_final_project/imdb/language_reference.csv")
        .select(
            lower(col("LanguageCode")).alias("LanguageCode"),
            col("LanguageDescription")
        )
    )

    final_df = (
        reference_languages
        .withColumn("LanguageKey", row_number().over(Window.orderBy("LanguageCode")))
        .select("LanguageKey", "LanguageCode", "LanguageDescription")
    )

    return final_df



# COMMAND ----------

# MAGIC %md
# MAGIC ### 3. DIM_NAME (Person Dimension)

# COMMAND ----------

@dlt.table(
    name="gold_DIM_NAME",
    comment="Person dimension - SCD Type 1 (complete refresh each run)"
)
@dlt.expect_or_drop("valid_nconst", "nconst IS NOT NULL")
@dlt.expect_or_drop("valid_name", "primaryName IS NOT NULL")
def gold_DIM_NAME():
    """
    SCD Type 1: Simple complete refresh
    Old data is replaced with new data each run
    ModifiedDate tracks when the record was last updated
    """
    return (
        dlt.read("silver_name_basics")
        .select(
            col("nconst").alias("NCONST"),
            col("primaryName").alias("PrimaryName"),
            col("birthYear").alias("BirthYear"),
            col("deathYear").alias("DeathYear")
        )
        .withColumn("ModifiedDate", current_timestamp())
        .withColumn("NameKey", row_number().over(Window.orderBy("nconst")))
        .select(
            "NameKey",
            "NCONST",
            "PrimaryName",
            "BirthYear",
            "DeathYear",
            "ModifiedDate"
        )
    )

# COMMAND ----------

# MAGIC %md
# MAGIC ### 4. DIM_Title (Title Dimension)

# COMMAND ----------

@dlt.table(
    name="gold_DIM_Title",
    comment="Title dimension - SCD Type 2 (tracks history with effective dates)"
)
@dlt.expect_or_drop("valid_tconst", "tconst IS NOT NULL")
@dlt.expect_or_drop("valid_title", "primaryTitle IS NOT NULL")
def gold_DIM_Title():
    """
    SCD Type 2: Simplified version
    - EffectiveDate: When record became active
    - EndDate: When record expired (9999-12-31 for current)
    - IsCurrent: Boolean flag for current version
    - CreatedDate: When record was first created
    - ModifiedDate: When record was last modified
    """
    return (
        dlt.read("silver_title_basics")
        .select(
            col("tconst").alias("Tconst"),
            col("titleType").alias("TitleType"),
            col("primaryTitle").alias("PrimaryTitle"),
            col("originalTitle").alias("OriginalTitle"),
            col("isAdult").alias("IsAdult"),
            col("startYear").alias("ReleaseYear"),
            col("runtimeMinutes").alias("RuntimeMinutes")
        )
        .withColumn("EffectiveDate", current_timestamp())
        .withColumn("EndDate", lit("9999-12-31 23:59:59").cast("timestamp"))
        .withColumn("IsCurrent", lit(True))
        .withColumn("CreatedDate", current_timestamp())
        .withColumn("ModifiedDate", current_timestamp())
        .withColumn("TitleKey", row_number().over(Window.orderBy("tconst")))
        .select(
            "TitleKey",
            "Tconst",
            "TitleType",
            "PrimaryTitle",
            "OriginalTitle",
            "IsAdult",
            "ReleaseYear",
            "RuntimeMinutes",
            "EffectiveDate",
            "EndDate",
            "IsCurrent",
            "CreatedDate",
            "ModifiedDate"
        )
    )

# COMMAND ----------

# MAGIC %md
# MAGIC ### 5. DIM_Genre

# COMMAND ----------

@dlt.table(
    name="gold_DIM_Genre",
    comment="Genre dimension extracted from comma-separated genres"
)
def gold_DIM_Genre():
    # Define valid genres in lowercase to match source data
    valid_genres = [
        'action', 'adult', 'adventure', 'animation', 'biography',
        'comedy', 'crime', 'documentary', 'drama', 'family',
        'fantasy', 'film-noir', 'game-show', 'history', 'horror',
        'music', 'musical', 'mystery', 'news', 'reality-tv',
        'romance', 'sci-fi', 'short', 'sport', 'talk-show',
        'thriller', 'war', 'western'
    ]
    
    return (
        dlt.read("silver_title_basics")
        .select(explode(split(col("genres"), ",")).alias("GenreName"))
        .filter(col("GenreName").isNotNull())
        .withColumn("GenreName", trim(col("GenreName")))
        # Filter out empty strings
        .filter(col("GenreName") != "")
        # Filter out "\\N" (IMDB's null representation)
        .filter(col("GenreName") != "\\N")
        # Convert to lowercase for comparison
        .withColumn("GenreName_lower", lower(col("GenreName")))
        # Only keep valid genres
        .filter(col("GenreName_lower").isin(valid_genres))
        # Convert to Title Case for display
        .withColumn("GenreName", initcap(col("GenreName_lower")))
        .drop("GenreName_lower")
        .distinct()
        .withColumn("GenreKey", row_number().over(Window.orderBy("GenreName")))
        .select("GenreKey", "GenreName")
    )

# COMMAND ----------

# MAGIC %md
# MAGIC ### 6. DIM_Profession

# COMMAND ----------

@dlt.table(
    name="gold_DIM_Profession",
    comment="Profession dimension extracted from comma-separated professions"
)
def gold_DIM_Profession():
    return (
        dlt.read("silver_name_basics")
        .select(explode(split(col("primaryProfession"), ",")).alias("Profession"))
        .filter(col("Profession").isNotNull())
        .withColumn("Profession", trim(col("Profession")))
        .distinct()
        .withColumn("ProfessionKey", row_number().over(Window.orderBy("Profession")))
        .select("ProfessionKey", "Profession")
    )

# COMMAND ----------

# MAGIC %md
# MAGIC ### 7. DIM_Crew

# COMMAND ----------

@dlt.table(
    name="gold_DIM_Crew",
    comment="Crew role dimension"
)
def gold_DIM_Crew():
    crew_roles = [
        {'CrewKey': 1, 'Crew_Role': 'director'},
        {'CrewKey': 2, 'Crew_Role': 'writer'}
    ]
    return spark.createDataFrame(crew_roles)

# COMMAND ----------

# MAGIC %md
# MAGIC ### 8. DIM_Principals (Type 6 Dimension)

# COMMAND ----------

@dlt.table(
    name="gold_DIM_Principals",
    comment="Principal cast and crew dimension - SCD Type 1"
)
def gold_DIM_Principals():
    """
    SCD Type 1: Simple implementation that rebuilds the table on each run
    ModifiedDate tracks when the record was last updated
    """
    # Read sources
    principals = dlt.read("silver_title_principals")
    titles = dlt.read("gold_DIM_Title").filter(col("IsCurrent") == True)  # Only current titles
    names = dlt.read("gold_DIM_NAME")
    
    # Simple join and transform
    result = (
        principals
        .join(names, principals.nconst == names.NCONST, "inner")
        .join(titles, principals.tconst == titles.Tconst, "inner")
        .select(
            names.NameKey.alias("NameKey"),
            titles.TitleKey.alias("TitleKey"),
            principals.ordering.alias("Ordering"),
            principals.category.alias("Category"),
            principals.job.alias("Job"),
            principals.characters.alias("Characters")
        )
        .filter(col("NameKey").isNotNull())
        .withColumn("ModifiedDate", current_timestamp())
        .withColumn("PrincipalKey", row_number().over(Window.orderBy("TitleKey", "Ordering")))
        .select(
            "PrincipalKey",
            "NameKey",
            "TitleKey",
            "Ordering",
            "Category",
            "Job",
            "Characters",
            "ModifiedDate"
        )
    )
    
    return result

# COMMAND ----------

# MAGIC %md
# MAGIC ## FACT TABLES

# COMMAND ----------

# MAGIC %md
# MAGIC ### 9. FACT_Title_Ratings

# COMMAND ----------

@dlt.table(
    name="gold_FACT_Title_Ratings",
    comment="Fact table for title ratings"
)
def gold_FACT_Title_Ratings():
    # Read sources
    ratings = dlt.read("silver_title_ratings")
    titles = dlt.read("gold_DIM_Title")
    
    # Perform join and select with explicit column references
    result = (
        ratings
        .join(titles, ratings.tconst == titles.Tconst, "inner")
        .select(
            ratings.tconst.alias("tconst"),  # Explicitly use ratings.tconst
            titles.TitleKey.alias("TitleKey"),
            ratings.averageRating.alias("AverageRating"),
            ratings.numVotes.alias("NumVotes")
        )
        .filter(col("tconst").isNotNull())  # Now unambiguous
        .filter((col("AverageRating") >= 0) & (col("AverageRating") <= 10))  # Quality check
        .withColumn("RatingKey", row_number().over(Window.orderBy("tconst")))
        .select(
            "RatingKey",
            "TitleKey",
            "AverageRating",
            "NumVotes"
        )
    )
    
    return result

# COMMAND ----------

# MAGIC %md
# MAGIC ### 10. FACT_Episodes

# COMMAND ----------

@dlt.table(
    name="gold_FACT_Episodes",
    comment="Fact table for TV episodes"
)
def gold_FACT_Episodes():
    # Read sources with aliases
    episodes = dlt.read("silver_title_episode")
    titles = dlt.read("gold_DIM_Title")
    
    # Perform join and select with explicit column references
    result = (
        episodes
        .join(titles, episodes.tconst == titles.Tconst, "inner")
        .select(
            episodes.tconst.alias("tconst"),  # Explicitly use episodes.tconst
            titles.TitleKey.alias("TitleKey"),
            episodes.seasonNumber.alias("SeasonNumber"),
            episodes.episodeNumber.alias("EpisodeNumber")
        )
        .filter(col("tconst").isNotNull())  # Now unambiguous
        .withColumn("EpisodeKey", row_number().over(Window.orderBy("tconst")))
        .select(
            "EpisodeKey",
            "TitleKey",
            "SeasonNumber",
            "EpisodeNumber"
        )
    )
    
    return result

# COMMAND ----------

# MAGIC %md
# MAGIC ## BRIDGE TABLES

# COMMAND ----------

# MAGIC %md
# MAGIC ### 11. BRIDGE_TITLE_GENRE

# COMMAND ----------

@dlt.table(
    name="gold_BRIDGE_TITLE_GENRE",
    comment="Bridge table linking titles to genres (many-to-many, up to 3 genres per title)"
)
def gold_BRIDGE_TITLE_GENRE():
    titles = dlt.read("gold_DIM_Title")
    genres = dlt.read("gold_DIM_Genre")
    
    # Explode genres from comma-separated values (up to 3 genres)
    titles_with_genres = (
        dlt.read("silver_title_basics")
        .select(
            "tconst",
            posexplode(split(col("genres"), ",")).alias("genre_position", "GenreName")
        )
        .withColumn("GenreName", trim(col("GenreName")))
        .filter(col("GenreName").isNotNull())
        .filter(col("GenreName") != "")
        .filter(col("GenreName") != "\\N")  # Filter out IMDB null values
        # Convert to Title Case to match gold_DIM_Genre format
        .withColumn("GenreName", initcap(col("GenreName")))
        .filter(col("genre_position") < 3)  # Ensure max 3 genres (positions 0, 1, 2)
    )
    
    return (
        titles_with_genres
        .join(titles, titles_with_genres.tconst == titles.Tconst, "inner")
        # Inner join will automatically filter out invalid genres not in DIM_Genre
        .join(genres, titles_with_genres.GenreName == genres.GenreName, "inner")
        .select(
            row_number().over(Window.orderBy("TitleKey", "GenreKey")).alias("TitleGenreKey"),
            col("TitleKey"),
            col("GenreKey"),
            col("genre_position").alias("GenreOrder")  # Track order (0=primary, 1=secondary, 2=tertiary)
        )
        .distinct()
    )

# COMMAND ----------

# MAGIC %md
# MAGIC ### 12. BRIDGE_PROFESSION

# COMMAND ----------

@dlt.table(
    name="gold_BRIDGE_PROFESSION",
    comment="Bridge table linking people to professions (many-to-many) with IsPrimary flag"
)
def gold_BRIDGE_PROFESSION():
    # Explode professions from comma-separated values
    # posexplode gives us position (0 = primary profession)
    names_with_professions = (
        dlt.read("silver_name_basics")
        .select(
            "nconst",
            posexplode(split(col("primaryProfession"), ",")).alias("position", "Profession")
        )
        .withColumn("Profession", trim(col("Profession")))
        .withColumn("IsPrimary", when(col("position") == 0, 1).otherwise(0))
        .filter(col("Profession").isNotNull())
    )
    
    names = dlt.read("gold_DIM_NAME")
    professions = dlt.read("gold_DIM_Profession")
    
    return (
        names_with_professions
        .join(names, names_with_professions.nconst == names.NCONST, "inner")
        .join(professions, names_with_professions.Profession == professions.Profession, "inner")
        .select(
            row_number().over(Window.orderBy("NameKey", "ProfessionKey")).alias("titleProfessionKey"),
            col("ProfessionKey"),
            col("NameKey"),
            col("IsPrimary")
        )
        .distinct()
    )

# COMMAND ----------

# MAGIC %md
# MAGIC ### 13. Bridge_Title_Crew

# COMMAND ----------

@dlt.table(
    name="gold_Bridge_Title_Crew",
    comment="Bridge table linking titles to crew members (directors and writers)"
)
def gold_Bridge_Title_Crew():
    crew = dlt.read("silver_title_crew")
    titles = dlt.read("gold_DIM_Title")
    names = dlt.read("gold_DIM_NAME")
    dim_crew = dlt.read("gold_DIM_Crew")
    
    # Explode directors
    directors = (
        crew
        .select("tconst", explode(split(col("directors"), ",")).alias("nconst"))
        .withColumn("nconst", trim(col("nconst")))
        .withColumn("role", lit("director"))
        .filter(col("nconst").isNotNull())
        .filter(col("nconst") != "")
    )
    
    # Explode writers
    writers = (
        crew
        .select("tconst", explode(split(col("writers"), ",")).alias("nconst"))
        .withColumn("nconst", trim(col("nconst")))
        .withColumn("role", lit("writer"))
        .filter(col("nconst").isNotNull())
        .filter(col("nconst") != "")
    )
    
    # Union directors and writers
    all_crew = directors.union(writers)
    
    return (
        all_crew
        .join(titles, all_crew.tconst == titles.Tconst, "inner")
        .join(names, all_crew.nconst == names.NCONST, "inner")
        .join(dim_crew, all_crew.role == lower(dim_crew.Crew_Role), "inner")
        .select(
            row_number().over(Window.orderBy("TitleKey", "NameKey", "CrewKey")).alias("titleCrewKey"),
            col("CrewKey"),
            col("TitleKey"),
            col("NameKey")
        )
        .distinct()
    )

# COMMAND ----------

# MAGIC %md
# MAGIC ### 14. BRIDGE_Akas

# COMMAND ----------

@dlt.table(
    name="gold_BRIDGE_Akas",
    comment="Bridge table for title alternate names (akas) with regional variations"
)
def gold_BRIDGE_Akas():
    akas = dlt.read("silver_title_akas")
    titles = dlt.read("gold_DIM_Title")
    regions = dlt.read("gold_DIM_Region")
    languages = dlt.read("gold_DIM_Language")
    
    return (
        akas
        # Filter out rows with encoding issues
        .filter(~col("title").rlike("[?�]"))
        .filter(col("title").isNotNull())
        .filter(col("titleId").isNotNull())
        
        # Joins
        .join(titles, akas.titleId == titles.Tconst, "inner")
        .join(regions, akas.region == regions.RegionCode, "left")
        .join(languages, akas.language == languages.LanguageCode, "left")
        
        # Replace NULLs with -9999
        .withColumn("RegionKey", coalesce(col("RegionKey"), lit(-9999)))
        .withColumn("LanguageKey", coalesce(col("LanguageKey"), lit(-9999)))

        # Final output
        .select(
            row_number().over(Window.orderBy("TitleKey", "ordering")).alias("TitleAkasKey"),
            col("TitleKey"),
            col("RegionKey"),
            col("LanguageKey"),
            col("title").alias("AkasTitle"),
            col("isOriginalTitle").alias("IsOriginalTitle")
        )
    )


# COMMAND ----------

# MAGIC %md
# MAGIC ## Data Quality Summary

# COMMAND ----------

print("""
Gold Layer Implementation Complete!

DIMENSIONS (8):
1. gold_DIM_Region - Region codes with descriptions (static file)
2. gold_DIM_Language - Language codes with descriptions (static file)
3. gold_DIM_NAME - Person information
4. gold_DIM_Title - Movie/TV show information
5. gold_DIM_Genre - Genre dimension
6. gold_DIM_Profession - Profession dimension
7. gold_DIM_Crew - Crew role dimension
8. gold_DIM_Principals - Principal cast/crew (Type 6 SCD)

FACTS (2):
9. gold_FACT_Title_Ratings - Title ratings and votes
10. gold_FACT_Episodes - TV episode information

BRIDGES (4):
11. gold_BRIDGE_TITLE_GENRE - Title to Genre many-to-many
12. gold_BRIDGE_PROFESSION - Person to Profession many-to-many (with IsPrimary)
13. gold_Bridge_Title_Crew - Title to Crew many-to-many
14. gold_BRIDGE_Akas - Title to Regional names with language support

REFERENCE DATA:
- Region descriptions loaded from: /Volumes/workspace/imdb_final_project/imdb/region_reference.csv
- Language descriptions loaded from: /Volumes/workspace/imdb_final_project/imdb/language_reference.csv

DATA QUALITY:
- Null handling with DLT expectations
- Encoding issues filtered in BRIDGE_Akas
- Primary professions marked with IsPrimary flag
- Surrogate keys generated using row_number()
- Inner joins ensure referential integrity
""")