In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id, year, month, dayofmonth, quarter, date_format, lit, current_timestamp, sha2, concat_ws, when, max as spark_max
from pyspark.sql.utils import AnalysisException
from delta.tables import DeltaTable

# Definição dos schemas do Unity Catalog
silver_schema = "workspace.silver_db"
gold_schema = "workspace.gold_db"

# Garante que o schema de destino exista
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {gold_schema}")

erros_gold = []

# FUNÇÃO 1: SCD TIPO 2 (Para as Dimensões)
def executar_scd2(spark, df_source, target_table_full_name, key_columns, exclude_cols=[]):
    # 1. PREPARAÇÃO: Calcula o HASH na origem
    cols_to_hash = [c for c in df_source.columns if c not in key_columns and c not in exclude_cols]
    df_source_hashed = df_source.withColumn("row_hash", sha2(concat_ws("||", *cols_to_hash), 256))

    # 2. Cria tabela se não existir (Carga Inicial)
    try:
        spark.read.table(target_table_full_name).limit(1).count()

        if "row_hash" not in df_target.columns:
            raise Exception("Schema Incompatível - Recriando tabela")
    except:
        print(f"Dimensão {target_table_full_name} não existe. Criando carga inicial...")
        spark.sql(f"DROP TABLE IF EXISTS {target_table_full_name}")
        df_init = df_source.withColumn("is_current", lit(True)) \
                           .withColumn("start_date", current_timestamp()) \
                           .withColumn("end_date", lit(None).cast("timestamp"))
        df_init.write.format("delta").saveAsTable(target_table_full_name)
        return

    # 3. Lógica de Merge SCD2
    delta_target = DeltaTable.forName(spark, target_table_full_name)
    cols_to_hash = [c for c in df_source.columns if c not in key_columns and c not in exclude_cols]
    
    df_source_hashed = df_source.withColumn("row_hash", sha2(concat_ws("||", *cols_to_hash), 256))
    
    df_target_active = spark.read.table(target_table_full_name).filter("is_current = true") \
        .select(*key_columns, "row_hash") \
        .withColumnRenamed("row_hash", "target_hash")

    join_cond = [df_source_hashed[k] == df_target_active[k] for k in key_columns]
    
    df_staged = df_source_hashed.join(df_target_active, join_cond, "left") \
        .withColumn("action", 
            when(col("target_hash").isNull(), "INSERT")
            .when(col("row_hash") != col("target_hash"), "UPDATE")
            .otherwise("NO_ACTION")
        ).filter("action != 'NO_ACTION'")

    if df_staged.count() == 0:
        print(f"Nenhuma alteração para {target_table_full_name}.")
        return

    df_updates = df_staged.filter("action = 'UPDATE'").select(*key_columns, lit("true").alias("mergeKey"))
    
    df_inserts = df_staged.select(*df_source.columns, lit(None).alias("mergeKey")) \
        .withColumn("is_current", lit(True)) \
        .withColumn("start_date", current_timestamp()) \
        .withColumn("end_date", lit(None).cast("timestamp"))

    for c in df_inserts.columns:
        if c not in df_updates.columns:
            df_updates = df_updates.withColumn(c, lit(None))
            
    df_merge_source = df_inserts.unionByName(df_updates, allowMissingColumns=True)

    merge_condition = " AND ".join([f"target.{k} = source.{k}" for k in key_columns])
    
    delta_target.alias("target").merge(
        df_merge_source.alias("source"),
        f"{merge_condition} AND (source.mergeKey = 'true')"
    ).whenMatchedUpdate(
        set = {"is_current": "false", "end_date": "current_timestamp()"}
    ).whenNotMatchedInsert(
        values = {c: f"source.{c}" for c in df_source.columns if c != "mergeKey"} | 
                 {"is_current": "true", "start_date": "current_timestamp()", "end_date": "null"}
    ).execute()
    print(f"SCD2 executado com sucesso em {target_table_full_name}")
                                       
# --- CRIAR DIMENSÕES ---

# 1. dim_companies
try:
    print("Criando dim_companies...")
    # Lendo tabelas necessárias
    df_companies = spark.read.table(f"{silver_schema}.companies")
    df_industries = spark.read.table(f"{silver_schema}.industries")

    dim_companies = df_companies.join(df_industries, df_companies.industry_id == df_industries.id, "left") \
        .select(
            col("companies.id").alias("company_id"),
            col("company_name"),
            col("company_rating"),
            col("industry_name")
        )

    executar_scd2(spark, dim_companies, f"{gold_schema}.dim_companies", ["company_id"])
    print("SUCESSO: dim_companies criada.")
except Exception as e:
    msg = f"ERRO ao criar dim_companies: {e}"
    print(msg)
    erros_gold.append(msg)

# 2. dim_locations
try:
    print("Criando dim_locations...")
    df_locations = spark.read.table(f"{silver_schema}.locations")

    dim_locations = df_locations.select(
        col("id").alias("location_id"), "city", "state_abbr"
    )

    executar_scd2(spark, dim_locations, f"{gold_schema}.dim_locations", ["location_id"])
    print("SUCESSO: dim_locations criada.")
except Exception as e:
    msg = f"ERRO ao criar dim_locations: {e}"
    print(msg)
    erros_gold.append(msg)

# 3. dim_skills
try:
    print("Criando dim_skills...")
    df_skills = spark.read.table(f"{silver_schema}.skills")

    dim_skills = df_skills.select(col("id").alias("skill_id"), col("skill_name"))

    executar_scd2(spark, dim_skills, f"{gold_schema}.dim_skills", ["skill_id"])
    print("SUCESSO: dim_skills criada.")
except Exception as e:
    msg = f"ERRO ao criar dim_skills: {e}"
    print(msg)
    erros_gold.append(msg)

# 4. dim_date
try:
    print("Criando dim_date...")
    df_jobs = spark.read.table(f"{silver_schema}.jobs") # Data vem de jobs

    dim_date = df_jobs.select(col("listing_date").alias("date")).distinct() \
        .withColumn("date_id", monotonically_increasing_id()) \
        .withColumn("year", year(col("date"))) \
        .withColumn("month", month(col("date"))) \
        .withColumn("day", dayofmonth(col("date"))) \
        .withColumn("quarter", quarter(col("date"))) \
        .withColumn("month_name", date_format(col("date"), "MMMM")) \
        .select("date_id", "date", "year", "month", "day", "quarter", "month_name")
    
    dim_date.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{gold_schema}.dim_date")
    print("SUCESSO: dim_date criada.")
except Exception as e:
    msg = f"ERRO ao criar dim_date: {e}"
    print(msg)
    erros_gold.append(msg)

# --- CRIAR TABELAS FATO ---

# Tabela Fato principal: fact_jobs
try:
    print("Processando fact_jobs (Incremental)...")
    target_fact = f"{gold_schema}.fact_jobs"
    
    # 1. Determinar o Checkpoint (Data máxima já processada)
    try:
        max_date_row = spark.read.table(target_fact).agg(spark_max("listing_date").alias("max_date")).collect()[0]
        last_checkpoint = max_date_row["max_date"]
    except:
        last_checkpoint = None
    
    print(f"Último checkpoint (Data Máxima na Gold): {last_checkpoint}")

    # 2. Ler dados da Silver
    df_jobs = spark.read.table(f"{silver_schema}.jobs")
    
    # 3. Filtrar apenas dados novos (Incremental)
    if last_checkpoint:
        print(f"Filtrando dados posteriores a {last_checkpoint}...")
        df_jobs_new = df_jobs.filter(col("listing_date") > last_checkpoint)
    else:
        print("Carga Inicial (Full Load)...")
        df_jobs_new = df_jobs

    # Se não tem dados novos, pula
    if df_jobs_new.count() > 0:
        # Preparar dados
        df_salary = spark.read.table(f"{silver_schema}.salary_ranges")
        dim_date = spark.read.table(f"{gold_schema}.dim_date")

        fact_jobs = df_jobs_new.join(dim_date, df_jobs_new.listing_date == dim_date.date, "inner") \
            .join(df_salary, df_jobs_new.salary_range_id == df_salary.id, "left") \
            .select(
                col("jobs.id").alias("job_id"),
                col("date_id"),
                col("jobs.listing_date"),
                col("jobs.company_id"),
                col("jobs.location_id"),
                col("jobs.employment_type_id"),
                col("job_title"),
                col("min_salary"),
                col("max_salary"),
                col("avg_salary")
            )
        
        # Gravar Incrementalmente (APPEND)
        fact_jobs.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(target_fact)
        print(f"SUCESSO: {fact_jobs.count()} novos registros inseridos em fact_jobs.")
    else:
        print("Nenhum dado novo para fact_jobs.")
except Exception as e:
    msg = f"ERRO fact_jobs: {e}"
    print(msg)
    erros_gold.append(msg)

# Tabela Fato de ponte: fact_job_skills
try:
    print("Criando fact_job_skills...")
    df_job_skills = spark.read.table(f"{silver_schema}.job_skills")

    fact_job_skills = df_job_skills.select(col("job_id"), col("skill_id"))
    fact_job_skills.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{gold_schema}.fact_job_skills")
    print("SUCESSO: fact_job_skills criada.")
except Exception as e:
    msg = f"ERRO ao criar fact_job_skills: {e}"
    print(msg)
    erros_gold.append(msg)

print("\n---------------------------------------------------")
if len(erros_gold) > 0:
    print(f"O processo Gold terminou com {len(erros_gold)} erros:")
    for erro in erros_gold:
        print(erro)
    print("---------------------------------------------------")
    
    raise Exception("Falha no processamento da camada Gold. Verifique os logs acima.")
else:
    print("Processo da camada Gold finalizado com SUCESSO TOTAL.")