# Proyecto AirBnB Reviews con Datalake - Bronze

**Capa:** Bronze

---


### Step 1
**Objetivo:** Prepara el catálogo y el esquema donde se almacenarán los datos crudos.

---


In [0]:
USE CATALOG airbnb;

CREATE SCHEMA IF NOT EXISTS bronze
  COMMENT 'Capa Bronze: datos crudos leídos desde el volume workspace.airbnb.reviews';


### Step 2
**Objetivo:** Carga los datos crudos desde archivos JSON hacia la capa Bronze.

---


In [0]:
%python
from pyspark.sql import functions as F

# 1 Leer JSON
df_raw = (spark.read
          .option("multiLine", True)
          .json("/Volumes/workspace/airbnb/reviews/listingsAndReviews-1.json"))

# 2 Preparar DataFrame Bronze
df_bronze = (
    df_raw
    .withColumn("id", F.coalesce(F.col("_id"), F.md5(F.to_json(F.struct("*")))))      # usa _id o genera hash
    .withColumn("json_raw", F.to_json(F.struct("*")))                                 # guarda JSON completo como string
    .withColumn("fecha_carga", F.current_timestamp())
    .withColumn("nombre_archivo_origen", F.col("_metadata.file_path"))
    .withColumn("id_proceso", F.lit("bronze_manual_load"))
    .select("id", "json_raw", "fecha_carga", "nombre_archivo_origen", "id_proceso")   # importante
)

# 3 Crear tabla Bronze si no existe
spark.sql("""
CREATE TABLE IF NOT EXISTS airbnb.bronze.bronze_listings_raw (
  id STRING,
  json_raw STRING,
  fecha_carga TIMESTAMP,
  nombre_archivo_origen STRING,
  id_proceso STRING
) USING DELTA
""")

# 4 Escribir los datos
(df_bronze.write
 .format("delta")
 .mode("append")
 .saveAsTable("airbnb.bronze.bronze_listings_raw"))
