In [0]:
# MAGIC %md
# MAGIC # 📥 02_Bronze_Ingestion
# MAGIC Ingesta datos sintéticos desde Volume a tabla Delta Bronze usando Auto Loader.
# MAGIC - Lee desde `/Volumes/fraude_qr/bronze/raw_data/synthetic_data/synthetic_qr_transactions`
# MAGIC - Escribe en `fraude_qr.bronze.qr_transactions_raw`
# MAGIC - Maneja partición `date` explícitamente
# MAGIC - Incluye columna residual `__index_level_0__` (se eliminará en Silver)
# MAGIC - Compatible con el esquema generado por tu notebook 01.

# COMMAND ----------

from pyspark.sql.functions import current_timestamp, col
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, TimestampType

# --- 1. Configuración de Rutas ---
base_volume_path = "/Volumes/fraude_qr/bronze/raw_data"
source_data_path = f"{base_volume_path}/synthetic_data/synthetic_qr_transactions"
bronze_table_name = "fraude_qr.bronze.qr_transactions_raw"
checkpoint_path = f"{base_volume_path}/checkpoints/transactions"
schema_path = f"{base_volume_path}/schema/transactions"

# --- 2. Crear Directorios de Control ---
print(f"✅ Asegurando directorio de checkpoint: {checkpoint_path}")
dbutils.fs.mkdirs(checkpoint_path)

print(f"✅ Asegurando directorio de esquema: {schema_path}")
dbutils.fs.mkdirs(schema_path)

# --- 3. Definir Esquema Explícito (Adaptado a tu generador) ---
# ⚠️ Incluye __index_level_0__ porque viene del Parquet generado por pandas.
# ⚠️ 'date' es StringType porque viene de la partición de archivo.
schema = StructType([
    StructField("tx_id", LongType(), True),
    StructField("merchant_id", LongType(), True),
    StructField("payer_id", LongType(), True),
    StructField("device_id", StringType(), True),
    StructField("created_at", TimestampType(), True),      # Spark lo leerá como timestamp_ntz
    StructField("currency", StringType(), True),
    StructField("qr_type", StringType(), True),
    StructField("channel", StringType(), True),
    StructField("has_error", LongType(), True),
    StructField("amount", DoubleType(), True),
    StructField("merchant_lat", DoubleType(), True),
    StructField("merchant_lon", DoubleType(), True),
    StructField("mcc", LongType(), True),
    StructField("payer_lat", DoubleType(), True),
    StructField("payer_lon", DoubleType(), True),
    StructField("is_fraud", LongType(), True),
    StructField("qr_hash", StringType(), True),
    StructField("error_code", StringType(), True),
    StructField("__index_level_0__", LongType(), True),    # ← ¡Columna residual de pandas! Se eliminará en Silver.
    StructField("date", StringType(), True)                # ← ¡Viene de partición, debe ser STRING!
])

# --- 4. Configurar Auto Loader ---
print(f"📂 Leyendo desde: {source_data_path}")
print(f"💾 Escribiendo en: {bronze_table_name}")

bronze_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", schema_path)
    .option("cloudFiles.partitionColumns", "date")         # ← ¡CLAVE! Declara partición explícitamente.
    .option("mergeSchema", "true")
    .schema(schema)                                        # ← Usa esquema explícito.
    .load(source_data_path)
    .withColumn("source_file", col("_metadata.file_path"))
    .withColumn("ingested_at", current_timestamp())
    .withColumn("bronze_batch_id", col("_metadata.file_path"))  # Opcional: para trazabilidad
)

# --- 5. Escribir a Tabla Bronze (con reset de checkpoint) ---

print("🧹 Reiniciando estado de checkpoint...")
dbutils.fs.rm(checkpoint_path, True)  # ← ¡BORRA EL CHECKPOINT!
print("✅ Checkpoint eliminado.")

print("📂 Creando nuevo directorio de checkpoint...")
dbutils.fs.mkdirs(checkpoint_path)
print("✅ Checkpoint reinicializado.")

print("🚀 Iniciando ingesta...")

query = (
    bronze_df.writeStream
    .trigger(availableNow=True)  # Modo batch para datos ya generados
    .option("checkpointLocation", checkpoint_path)
    .option("mergeSchema", "true")
    .toTable(bronze_table_name)
)

query.awaitTermination()

print("✅ ¡Ingesta completada exitosamente!")



✅ Asegurando directorio de checkpoint: /Volumes/fraude_qr/bronze/raw_data/checkpoints/transactions
✅ Asegurando directorio de esquema: /Volumes/fraude_qr/bronze/raw_data/schema/transactions
📂 Leyendo desde: /Volumes/fraude_qr/bronze/raw_data/synthetic_data/synthetic_qr_transactions
💾 Escribiendo en: fraude_qr.bronze.qr_transactions_raw
🧹 Reiniciando estado de checkpoint...
✅ Checkpoint eliminado.
📂 Creando nuevo directorio de checkpoint...
✅ Checkpoint reinicializado.
🚀 Iniciando ingesta...
✅ ¡Ingesta completada exitosamente!


In [0]:
# Verificar conteo y esquema
df = spark.table("fraude_qr.bronze.qr_transactions_raw")
print(f"📊 Total registros: {df.count():,}")
df.printSchema()

# Verificar primeros registros
df.limit(3).display()

# Verificar distribución de fraude
df.groupBy("date").sum("is_fraud").orderBy("date").show(5)

📊 Total registros: 1,000,000
root
 |-- tx_id: long (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- payer_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- qr_type: string (nullable = true)
 |-- qr_hash: string (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- device_id: string (nullable = true)
 |-- payer_lat: double (nullable = true)
 |-- payer_lon: double (nullable = true)
 |-- merchant_lat: double (nullable = true)
 |-- merchant_lon: double (nullable = true)
 |-- channel: string (nullable = true)
 |-- has_error: integer (nullable = true)
 |-- error_code: string (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- source_file: string (nullable = true)
 |-- ingested_at: timestamp (nullable = true)
 |-- __index_level_0__: long (nullable = true)
 |-- date: string (nullable = true)
 |-- bronze_batch_id: string (nullable = true)



tx_id,created_at,merchant_id,payer_id,amount,currency,qr_type,qr_hash,mcc,device_id,payer_lat,payer_lon,merchant_lat,merchant_lon,channel,has_error,error_code,is_fraud,source_file,ingested_at,__index_level_0__,date,bronze_batch_id
18,2025-08-18T17:06:50.000Z,6765,34953,1269.69,ARS,static,276a3cf46ff17534cb651ce13c514b706cbd7a76d613ecf61b850e2f320c41bb,2174,3d4c98676c66aaa7ecd7b5d7d7d98c9ca3f00c26b5546c99666d39e2f601d2b3,-35.306386828067026,-60.35826053947818,-35.34880944713991,-60.32703364495234,app,0,,0,/Volumes/fraude_qr/bronze/raw_data/synthetic_data/synthetic_qr_transactions/date=2025-08-18/8ce76775a4a44ff8bd34bd263bede771-0.parquet,2025-09-19T22:14:49.492Z,17,2025-08-18,/Volumes/fraude_qr/bronze/raw_data/synthetic_data/synthetic_qr_transactions/date=2025-08-18/8ce76775a4a44ff8bd34bd263bede771-0.parquet
25,2025-08-18T18:38:48.000Z,4791,8053,5219.13,ARS,static,fe17304019abfdb0cd1d68dbd3eb0b5c178d7652f90b89c632a4a8191a7a120b,3017,0d307e2a68861a8db063dccc73a7fdeb79b889671fcee133bda1c0f28e2fb1ee,-34.29794245447968,-62.72253356525232,-34.258668052478725,-62.708512317936446,app,1,501.0,0,/Volumes/fraude_qr/bronze/raw_data/synthetic_data/synthetic_qr_transactions/date=2025-08-18/8ce76775a4a44ff8bd34bd263bede771-0.parquet,2025-09-19T22:14:49.492Z,24,2025-08-18,/Volumes/fraude_qr/bronze/raw_data/synthetic_data/synthetic_qr_transactions/date=2025-08-18/8ce76775a4a44ff8bd34bd263bede771-0.parquet
70,2025-08-18T20:28:21.000Z,2256,25599,301.05,ARS,static,0226aeb83188a0be84a6ed7fa45014d07fec5581bc056fa913d6b108e9002377,8952,6c1dda884f7e27c25fdf9450e4d4295ace43876aaf97e0ad4c155ecc8eca94dc,-35.09492301635486,-59.11512998317431,-34.97269200170279,-59.17905555642538,app,0,,0,/Volumes/fraude_qr/bronze/raw_data/synthetic_data/synthetic_qr_transactions/date=2025-08-18/8ce76775a4a44ff8bd34bd263bede771-0.parquet,2025-09-19T22:14:49.492Z,69,2025-08-18,/Volumes/fraude_qr/bronze/raw_data/synthetic_data/synthetic_qr_transactions/date=2025-08-18/8ce76775a4a44ff8bd34bd263bede771-0.parquet


+----------+-------------+
|      date|sum(is_fraud)|
+----------+-------------+
|2025-08-01|           79|
|2025-08-02|           89|
|2025-08-03|           80|
|2025-08-04|           91|
|2025-08-05|           67|
+----------+-------------+
only showing top 5 rows


In [0]:
print(f"📊 Total registros en Bronze: {spark.table('fraude_qr.bronze.qr_transactions_raw').count():,}")

📊 Total registros en Bronze: 1,000,000
