# Gold


In [0]:
from datetime import datetime

container = "ofertas-empleo"
account = spark.conf.get("adls.account.name")
gold_path = f"abfss://{container}@{account}.dfs.core.windows.net/gold/"


In [0]:
today = datetime.now().strftime("%Y-%m-%d")
# today = "2025-09-04"
plata_path = f"abfss://{container}@{account}.dfs.core.windows.net/silver/jobs_{today}/"
df_plata = spark.read.format("delta").load(plata_path)
df_plata.show(5)


+--------------------+--------------------+---------------+--------------------+--------------------+---------------+--------------------+----------------+----------------+--------------------+--------------------+--------------------+------+------------------+
|                 url|               title|   company_name|         company_url|        applications|         salary|         description|nivel_antiguedad|     tipo_empleo|     funcion_laboral|            sectores|      location_clean|ciudad|Comunidad_Autonoma|
+--------------------+--------------------+---------------+--------------------+--------------------+---------------+--------------------+----------------+----------------+--------------------+--------------------+--------------------+------+------------------+
|https://es.linked...|✨Fullstack Develo...|   knowmad mood|https://es.linked...|Sé de los primero...|Sin información|Si cuentas con un...|             Mid|Jornada completa|Gestión de proyectos|Servicios y consu...|

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS gold
COMMENT 'Capa Gold - Tablas optimizadas';

In [0]:
from pyspark.sql.functions import current_date, monotonically_increasing_id

# Dimensión Empresa
dim_empresa = df_plata.select("company_name", "company_url").distinct() \
    .withColumn("empresa_id", monotonically_increasing_id())

# Dimensión Ciudad
dim_ciudad = df_plata.select("ciudad", "Comunidad_Autonoma").distinct() \
    .withColumn("ciudad_id", monotonically_increasing_id())

# Dimensión Empleo
dim_empleo = df_plata.select("tipo_empleo", "nivel_antiguedad", "funcion_laboral", "sectores").distinct() \
    .withColumn("empleo_id", monotonically_increasing_id())

# Hechos (fact table) con snapshot
fact_ofertas = df_plata.join(dim_empresa, "company_name") \
    .join(dim_ciudad, "ciudad") \
    .join(dim_empleo, ["tipo_empleo", "nivel_antiguedad", "funcion_laboral", "sectores"]) \
    .select(
        "url", "applications", "salary", 
        "empresa_id", "ciudad_id", "empleo_id"
    ) \
    .withColumn("fecha_snapshot", current_date())

In [0]:
# Dimensiones (se pueden sobrescribir porque cambian poco)
dim_empresa.write.format("delta").mode("overwrite").save(gold_path + "dim_empresa")
dim_ciudad.write.format("delta").mode("overwrite").save(gold_path + "dim_ciudad")
dim_empleo.write.format("delta").mode("overwrite").save(gold_path + "dim_empleo")

# Hechos -> modo append para guardar histórico diario
fact_ofertas.write.format("delta").mode("append").save(gold_path + "fact_ofertas_historico")

# Tabla única combinando todo
fact_ofertas_full = df_plata.select(
    "url", "applications", "salary", 
    "company_name", "company_url",
    "ciudad", "Comunidad_Autonoma",
    "tipo_empleo", "nivel_antiguedad", "funcion_laboral", "sectores"
)

fact_ofertas_full.write.format("delta").mode("overwrite").save(gold_path + "fact_ofertas_full")

In [0]:
fact_ofertas.write.format("delta").mode("append").saveAsTable("gold.fact_ofertas_historico")
fact_ofertas_full.write.format("delta").mode("append").saveAsTable("gold.fact_ofertas_full")
dim_empresa.write.format("delta").mode("overwrite").saveAsTable("gold.dim_empresa")
dim_ciudad.write.format("delta").mode("overwrite").saveAsTable("gold.dim_ciudad")
dim_empleo.write.format("delta").mode("overwrite").saveAsTable("gold.dim_empleo")