### TRABAJO PR√ÅCTICO: INTRODUCCI√ìN A DATABRICKS Y
### PROCESAMIENTO DE DATOS CON APACHE SPARK

Este notebook sigue las etapas de la arquitectura Medallion, utilizando el Dataset "Credit Card Fraud"

## Preparaci√≥n del entorno

Se crea una base datos propia, rutas en DBFS, variables reutilizables y configuraciones de Spark a utilizar.

In [0]:
%sql
-- Crea una base de datos exclusiva para el proyecto
CREATE DATABASE IF NOT EXISTS lakehouse_fraude;

-- Opcional: usarla por defecto (evita escribir lakehouse_fraude. en cada tabla)
USE lakehouse_fraude;

In [0]:
# Rutas finales totalmente compatibles con Free Edition
PROJECT = "lakehouse_fraude"

# Carpeta base dentro del Workspace (permite escritura)
BASE = f"/Workspace/{PROJECT}"

RAW_INBOX  = f"{BASE}/raw_inbox"
CKPT_BRONZE = f"{BASE}/_ckpt/bronze"
CKPT_SILVER = f"{BASE}/_ckpt/silver"
CKPT_SCORE  = f"{BASE}/_ckpt/scoring"

# Tablas Delta
T_BRONZE = "lakehouse_fraude.bronze_transactions"
T_SILVER = "lakehouse_fraude.silver_transactions"
T_GOLD   = "lakehouse_fraude.gold_fraude_summary"
T_USERS  = "lakehouse_fraude.ref_usuarios"

print("BASE:", BASE)



In [0]:
# Importamos libreria 
import os

for path in [BASE, RAW_INBOX, CKPT_BRONZE, CKPT_SILVER, CKPT_SCORE]:
    os.makedirs(path, exist_ok=True)

print("üìÇ Carpetas creadas correctamente:")
for path in [BASE, RAW_INBOX, CKPT_BRONZE, CKPT_SILVER, CKPT_SCORE]:
    print("-", path)

print(os.listdir(BASE))


In [0]:
# Configuraciones Spark adaptadas a Free Edition, para poder optimizar el rendimiento y estabilizar el entorno.
def safe_conf(key, value):
    try:
        spark.conf.set(key, value)
        print(f"‚úîÔ∏è {key} = {value}")
    except Exception as e:
        print(f"‚ö†Ô∏è {key} no disponible ({str(e)[:50]})")

safe_conf("spark.sql.shuffle.partitions", "200")
safe_conf("spark.sql.adaptive.enabled", "true")    # AQE (puede no estar disponible)
safe_conf("spark.databricks.delta.optimizeWrite.enabled", "true")
safe_conf("spark.databricks.delta.autoCompact.enabled", "true")
safe_conf("spark.sql.streaming.stateStore.providerClass",
          "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

print("\n‚úÖ Configuraciones Spark aplicadas correctamente.")

## Configuraci√≥n del entorno

In [0]:
# Importamos las librerias a utilizar
from pyspark.sql import SparkSession

# Crear sesi√≥n de Spark (en Databricks ya viene creada como 'spark')
spark

In [0]:
# Importamos las librerias a utilizar
import pandas as pd
import kagglehub

# Ruta donde KaggleHub descarg√≥ el archivo
path = kagglehub.dataset_download("mlg-ulb/creditcardfraud") + "/creditcard.csv"

# Leer el CSV con pandas
pdf = pd.read_csv(path)

# Convertir a DataFrame de Spark
df_spark = spark.createDataFrame(pdf)

#Mostramos las primeras filas del dataset
print("‚úÖ Dataset restaurado correctamente.")
print("Registros totales:", len(pdf))
display(df_spark.limit(5))


## Carga y exploraci√≥n de datos- Capa Bronce

In [0]:
# Si el DataFrame Spark no existe (por restart), lo recreamos desde pandas
try:
    df_spark
except NameError:
    df_spark = spark.createDataFrame(pdf)

# Guardar/actualizar la capa Bronze en formato Delta Lake
df_spark.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable(T_BRONZE)

# Verificaci√≥n r√°pida
df_bronze = spark.table(T_BRONZE)
print(f"‚úÖ Tabla Bronze '{T_BRONZE}' creada correctamente.")
print(f"Total de registros: {df_bronze.count():,}")
display(df_bronze.limit(5))



In [0]:
#Consultas b√°sicas con SparkSQL para entender el contenido

#Ver las primeras filas
query = f"""
SELECT *
FROM {T_BRONZE}
df_bronze LIMIT 5
"""

df_query = spark.sql(query)

display(df_query)

In [0]:
# Contar registros totales
query = f"""
SELECT COUNT(*) AS total_filas
FROM {T_BRONZE}
"""

df_query = spark.sql(query)

display(df_query)

In [0]:
# Ver columnas y tipos
query = f"""
DESCRIBE TABLE {T_BRONZE}
"""

df_query = spark.sql(query)

display(df_query)

In [0]:
# Ver solo los nombres de las columnas
query = f"""
SHOW COLUMNS IN {T_BRONZE}
"""

df_query = spark.sql(query)

display(df_query)


## Transformaciones de datos - Capa Silver

A partir de la capa de Bronce, se realiza la limpieza de datos, aplicar transformaciones y realizar agregaciones y joins simples

In [0]:
# üîÅ Definir nombres de tablas y rutas de proyecto (igual que en la Etapa 0)
PROJECT = "lakehouse_fraude"

# Tablas Delta en el metastore
T_BRONZE = f"{PROJECT}.bronze_transactions"
T_SILVER = f"{PROJECT}.silver_transactions"
T_GOLD   = f"{PROJECT}.gold_fraude_summary"
T_USERS  = f"{PROJECT}.ref_usuarios"

print("Tablas definidas correctamente:")
print(f"  Bronze ‚Üí {T_BRONZE}")
print(f"  Silver ‚Üí {T_SILVER}")
print(f"  Gold   ‚Üí {T_GOLD}")
print(f"  Users  ‚Üí {T_USERS}")


In [0]:
# Importamos librer√≠as
from pyspark.sql.functions import (
    col, when, current_timestamp, expr,
    round as _round, count, avg, sum as _sum
)
from pyspark.sql.types import IntegerType, DoubleType

#Cargamos la capa Bronze
print("=== Cargando capa Bronze ===")
df_bronze = spark.table(T_BRONZE)
print("Filas en Bronze:", df_bronze.count())
display(df_bronze.limit(5))


In [0]:
from pyspark.sql import functions as F

# Leemos la tabla Bronze desde el metastore
df_bronze = spark.table("lakehouse_fraude.bronze_transactions")

print("Filas originales en Bronze:", df_bronze.count())

# Mostramos algunas columnas clave
display(df_bronze.select("Time", "Amount", "Class").limit(5))

In [0]:
#Limpieza b√°sica

df_silver = (
    df_bronze
    # Eliminar duplicados
    .dropDuplicates()
    
    # Rellenar valores nulos (si existieran)
    .na.fill(0)
    
    # Filtrar filas donde Amount no sea nulo (por seguridad)
    .filter(F.col("Amount").isNotNull())
    
    # Convertir Amount a tipo double
    .withColumn("Amount", F.col("Amount").cast("double"))

    # Convertir Class a tipo integer
    .withColumn("Class", F.col("Class").cast("integer"))

    # Convertir Time a tipo integer
    .withColumn("Time", F.col("Time").cast("integer"))
)

print("‚úÖ Datos limpiados y enriquecidos (Silver listos)")
display(df_silver.limit(5))


In [0]:
#Eliminar valores at√≠picos

# Calcular percentiles (Q1 y Q3)
quantiles = df_silver.approxQuantile("Amount", [0.25, 0.75], 0.05)
q1, q3 = quantiles
iqr = q3 - q1
limite_superior = q3 + 1.5 * iqr

# Filtrar registros fuera del rango
df_silver = df_silver.filter(col("Amount") <= limite_superior)


In [0]:
#Validaci√≥n de integridad
negativos = df_silver.filter(col("Amount") < 0).count()
nulos_class = df_silver.filter(col("Class").isNull()).count()

if negativos == 0 and nulos_class == 0:
    print("‚úÖ Datos consistentes: sin montos negativos ni clases nulas.")
else:
    print(f"‚ö†Ô∏è Montos negativos: {negativos} | Clases nulas: {nulos_class}")

In [0]:
#Agregando columnas derivadas

df_silver = (
    df_silver
    # Columna descriptiva del estado
    .withColumn("Status", when(col("Class") == 1, "Fraude").otherwise("Leg√≠tima"))
    # Categorizaci√≥n de montos
    .withColumn(
        "MontoCategoria",
        when(col("Amount") < 10, "Bajo")
        .when(col("Amount") < 100, "Medio")
        .otherwise("Alto")
    )
    # Columna logar√≠tmica (√∫til para an√°lisis)
    .withColumn("log_Amount", expr("log(Amount + 1)"))
    # Se agregar columna de tiempo en horas (aprox)
    .withColumn("HoraAprox", expr("CAST(Time/3600 AS INT)"))
    # Fecha/hora de procesamiento
    .withColumn("processing_date", current_timestamp())
)

display(df_silver.select("Status", "MontoCategoria", "log_Amount", "HoraAprox", "processing_date").limit(10))



In [0]:
#Aplicamos filtros de inter√©s

# Solo transacciones mayores a 1.0
df_silver_filtrado = df_silver.filter(col("Amount") > 1.0)

# Separar transacciones fraudulentas
df_fraudes = df_silver_filtrado.filter(col("Status") == "Fraude")
df_legitimas = df_silver_filtrado.filter(col("Status") == "Leg√≠tima")

display(df_fraudes.limit(5))             # Ver solo fraudes
display(df_legitimas.limit(5))     


In [0]:
#Agregaciones

# a) Promedio de monto por tipo de transacci√≥n
df_agg_promedio = (
    df_silver_filtrado
    .groupBy("Status")
    .agg(
        _round(avg("Amount"), 2).alias("MontoPromedio"),
        count("*").alias("TotalTransacciones")
    )
)

# Mostrar resultados
print("=== Agregaci√≥n por tipo de transacci√≥n ===")
df_agg_promedio.show()

In [0]:
# b) Conteo por categor√≠a de monto
df_agg_categoria = (
    df_silver_filtrado
    .groupBy("MontoCategoria")
    .agg(
        count("*").alias("Cantidad"),
        _round(avg("Amount"), 2).alias("PromedioMonto")
    )
)

print("=== Agregaci√≥n por categor√≠a de monto ===")
df_agg_categoria.show()

## Joins

In [0]:
# Unir resumen por Status con la tabla original
df_silver_enriquecido = df_silver_filtrado.join(
    df_agg_promedio.select("Status", "MontoPromedio"),
    on="Status",
    how="left"
)

display(df_silver_enriquecido.limit(5))             


#### Tabla de Referencia ref_usuarios

In [0]:
Incorporamos esta tabla para enriquecer los datos en la capa Silver (joins). Es una peque√±a tabla sint√©tica de 1000 usuarios con edad y nivel de riesgo. Simula el contexto de ‚Äúusuarios conocidos‚Äù que las empresas usan para cruzar con transacciones.

In [0]:
from pyspark.sql import functions as F

# Generamos 1000 usuarios con edad y monto promedio
users_realistic = spark.range(0, 1000).withColumnRenamed("id","usuario_id") \
    .withColumn("edad_usuario", (F.rand() * 50 + 18).cast("int")) \
    .withColumn("monto_promedio", (F.rand() * 1000 + 50).cast("double"))  # entre 50 y 1050 USD aprox.

# Ahora asignamos nivel de riesgo seg√∫n reglas "semi-realistas"
users_realistic = users_realistic.withColumn(
    "nivel_riesgo_usuario",
    F.when((F.col("edad_usuario") < 25) | (F.col("edad_usuario") > 60), "ALTO")  # extremos etarios
     .when((F.col("monto_promedio") > 800), "ALTO")                              # monto muy alto
     .when((F.col("monto_promedio") > 400), "MEDIO")                             # monto medio
     .otherwise("BAJO")                                                          # resto
)

# Guardamos la tabla en Delta
T_USERS = "lakehouse_fraude.ref_usuarios"
users_realistic.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .saveAsTable(T_USERS)


print("‚úÖ Tabla ref_usuarios (versi√≥n realista) creada correctamente.")
display(spark.table(T_USERS).limit(10))




In [0]:
# Verificamos que se realizaron los cambios por medio de una consulta
query = f"""
SELECT
    nivel_riesgo_usuario,
    COUNT(*) AS cantidad_usuarios,
    ROUND(AVG(monto_promedio), 2) AS promedio_monto,
    ROUND(AVG(edad_usuario), 1) AS promedio_edad
FROM lakehouse_fraude.ref_usuarios
GROUP BY nivel_riesgo_usuario
ORDER BY cantidad_usuarios DESC
"""
df_query = spark.sql(query)
display(df_query)


In [0]:
# Leemos la tabla de usuarios
df_users = spark.table("lakehouse_fraude.ref_usuarios")

# Hacemos el JOIN
df_silver = (df_users.join(df_users, ["usuario_id"], "left")
)

print("‚úÖ Tabla Silver enriquecida correctamente")
display(df_silver.limit(5))


In [0]:
#Guardando Capa Silver

T_SILVER = "lakehouse_fraude.silver_transactions"

df_silver_enriquecido.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(T_SILVER)

print("‚úÖ Capa Silver creada correctamente.")


In [0]:
#Verificaci√≥n Final
df_check = spark.table(T_SILVER)
print("‚úÖ Tabla Silver actualizada correctamente.")
print("otal registros finales:", df_check.count())
display(df_check.limit(5))

## Almacenamiento con Delta Lake - Capa Gold


In [0]:
Se realizar√°n validaciones de integridad

In [0]:
# üîÅ Definir nombres de tablas y rutas de proyecto (igual que en la Etapa 0)
PROJECT = "lakehouse_fraude"

# Tablas Delta en el metastore
T_BRONZE = f"{PROJECT}.bronze_transactions"
T_SILVER = f"{PROJECT}.silver_transactions"
T_GOLD   = f"{PROJECT}.gold_fraude_summary"
T_USERS  = f"{PROJECT}.ref_usuarios"

print("Tablas definidas correctamente:")
print(f"  Bronze ‚Üí {T_BRONZE}")
print(f"  Silver ‚Üí {T_SILVER}")
print(f"  Gold   ‚Üí {T_GOLD}")
print(f"  Users  ‚Üí {T_USERS}")


In [0]:
# Importar librer√≠as necesarias
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, LongType
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Configuraci√≥n de estilo para visualizaciones
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 6)


In [0]:
# Cargar Silver
df_silver = spark.table(T_SILVER)
print("Filas en Silver:", df_silver.count())
display(df_silver.limit(5))


In [0]:
# Resumen general de fraudes

df_gold_summary = df_silver.groupBy("Status").agg(
    F.count("*").alias("total_transacciones"),
    F.round(F.avg("Amount"), 2).alias("monto_promedio"),
    F.round(F.sum("Amount"), 2).alias("monto_total"),
    F.round(F.min("Amount"), 2).alias("monto_minimo"),
    F.round(F.max("Amount"), 2).alias("monto_maximo"),
    F.round(F.stddev("Amount"), 2).alias("desviacion_estandar")
).orderBy(F.desc("total_transacciones"))

display(df_gold_summary)


In [0]:
# Calcular porcentaje de fraudes
total_records = df_silver.count()
df_gold_summary = df_gold_summary.withColumn(
    "porcentaje_total",
    F.round((F.col("total_transacciones") / total_records) * 100, 2)
)

In [0]:
# Guardar en Delta Lake
df_gold_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("lakehouse_fraude.gold_fraude_summary")

print("‚úÖ Tabla gold_fraude_summary creada exitosamente")

In [0]:
# An√°lisis por categor√≠a monto
df_gold_categoria = df_silver.groupBy("MontoCategoria", "Status").agg(
    F.count("*").alias("cantidad_transacciones"),
    F.round(F.avg("Amount"), 2).alias("monto_promedio"),
    F.round(F.sum("Amount"), 2).alias("monto_total")
).orderBy("MontoCategoria", F.desc("cantidad_transacciones"))

display(df_gold_categoria)

# Guardar en Delta Lake
df_gold_categoria.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("lakehouse_fraude.gold_categoria_analisis")

print("‚úÖ Tabla gold_categoria_analisis creada exitosamente")

In [0]:
# Se muestra, para cada hora del d√≠a y tipo de transacci√≥n, cu√°ntas operaciones hubo y cu√°l fue el monto promedio.
df_gold_temporal = df_silver.groupBy("HoraAprox", "Status").agg(
    F.count("*").alias("num_transacciones"),
    F.round(F.avg("Amount"), 2).alias("monto_promedio")
).orderBy("HoraAprox", "Status")

display(df_gold_temporal)

# Guardar en Delta Lake
df_gold_temporal.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("lakehouse_fraude.gold_analisis_temporal")

print("‚úÖ Tabla gold_analisis_temporal creada exitosamente")


In [0]:
# Estad√≠sticas de riesgo
df_gold_riesgo = df_silver.groupBy("Status", "MontoCategoria").agg(
    F.count("*").alias("cantidad"),
    F.round(F.avg("log_Amount"), 4).alias("log_monto_promedio")
).orderBy("Status", F.desc("cantidad"))

display(df_gold_riesgo)

# Guardar en Delta Lake
df_gold_riesgo.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("lakehouse_fraude.gold_riesgo_estadisticas")

print("‚úÖ Tabla gold_riesgo_estadisticas creada exitosamente")


In [0]:
# Validaci√≥n de integridad de datos

# Leer datos desde Delta Lake
df_gold_check = spark.table("lakehouse_fraude.gold_fraude_summary")

# Validaci√≥n 1: Verificar que no hay valores nulos en columnas clave
print("\n1Ô∏è‚É£ Verificando valores nulos...")
null_counts = df_gold_check.select([
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) 
    for c in df_gold_check.columns
])
display(null_counts)

In [0]:
# Validaci√≥n 2: Verificar suma de porcentajes = 100%
suma_porcentajes = df_gold_check.agg(
    F.round(F.sum("porcentaje_total"), 2).alias("suma_porcentajes")
).collect()[0]["suma_porcentajes"]

if suma_porcentajes == 100.0:
    print(f"‚úÖ Validaci√≥n correcta: suma de porcentajes = {suma_porcentajes}%")
else:
    print(f"‚ö†Ô∏è Advertencia: suma de porcentajes = {suma_porcentajes}% (esperado: 100%)")


In [0]:
# Validaci√≥n 3: Verificar coherencia de datos

df_validation = df_gold_check.select(
    "Status",
    "monto_minimo",
    "monto_promedio",
    "monto_maximo"
).collect()

for row in df_validation:
    if row.monto_minimo <= row.monto_promedio <= row.monto_maximo:
        print(f"‚úÖ {row.Status}: Coherencia correcta (min ‚â§ promedio ‚â§ max)")
    else:
        print(f"‚ùå {row.Status}: Incoherencia detectada")

In [0]:
# Validaci√≥n 4: Comparar totales Silver vs Gold
print("\n4Ô∏è‚É£ Comparando totales Silver vs Gold...")
total_silver = df_silver.count()
total_gold = df_gold_check.agg(F.sum("total_transacciones")).collect()[0][0]

if total_silver == total_gold:
    print(f"‚úÖ Totales coinciden: Silver = {total_silver:,}, Gold = {total_gold:,}")
else:
    print(f"‚ö†Ô∏è Discrepancia: Silver = {total_silver:,}, Gold = {total_gold:,}")

print("\n‚úÖ Validaci√≥n de integridad completada")

## Visualizaciones

In [0]:
# Convertir datos a Pandas para visualizaci√≥n
pdf_summary = df_gold_summary.toPandas()
pdf_categoria = df_gold_categoria.toPandas()
pdf_temporal = df_gold_temporal.toPandas()

In [0]:
print("\nüìä Gr√°fico 1: Distribuci√≥n de Transacciones")

plt.figure(figsize=(12, 5))

# Subgr√°fico 1: Cantidad de transacciones
plt.subplot(1, 2, 1)
colors = ['#ff6b6b', '#51cf66']
plt.bar(pdf_summary['Status'], pdf_summary['total_transacciones'], color=colors)
plt.title('Cantidad de Transacciones por Tipo', fontsize=14, fontweight='bold')
plt.xlabel('Tipo de Transacci√≥n', fontsize=12)
plt.ylabel('N√∫mero de Transacciones', fontsize=12)
plt.xticks(rotation=0)

# Agregar valores en las barras
for i, v in enumerate(pdf_summary['total_transacciones']):
    plt.text(i, v + max(pdf_summary['total_transacciones'])*0.02, 
             f'{v:,}', ha='center', va='bottom', fontweight='bold')

# Subgr√°fico 2: Porcentaje
plt.subplot(1, 2, 2)
plt.pie(pdf_summary['porcentaje_total'], 
        labels=pdf_summary['Status'],
        autopct='%1.2f%%',
        colors=colors,
        startangle=90,
        textprops={'fontsize': 12, 'fontweight': 'bold'})
plt.title('Distribuci√≥n Porcentual', fontsize=14, fontweight='bold')

plt.tight_layout()
plt.show()


In [0]:
print("\nüìä Gr√°fico 2: An√°lisis de Montos")

plt.figure(figsize=(14, 5))

# Subgr√°fico 1: Montos promedio
plt.subplot(1, 3, 1)
plt.bar(pdf_summary['Status'], pdf_summary['monto_promedio'], color=colors)
plt.title('Monto Promedio por Tipo', fontsize=14, fontweight='bold')
plt.xlabel('Tipo de Transacci√≥n', fontsize=12)
plt.ylabel('Monto Promedio ($)', fontsize=12)
for i, v in enumerate(pdf_summary['monto_promedio']):
    plt.text(i, v + max(pdf_summary['monto_promedio'])*0.02, 
             f'${v:.2f}', ha='center', va='bottom', fontweight='bold')


In [0]:
# Subgr√°fico 2: Monto total
plt.subplot(1, 3, 2)
plt.bar(pdf_summary['Status'], pdf_summary['monto_total'], color=colors)
plt.title('Monto Total Acumulado', fontsize=14, fontweight='bold')
plt.xlabel('Tipo de Transacci√≥n', fontsize=12)
plt.ylabel('Monto Total ($)', fontsize=12)
for i, v in enumerate(pdf_summary['monto_total']):
    plt.text(i, v + max(pdf_summary['monto_total'])*0.02, 
             f'${v:,.0f}', ha='center', va='bottom', fontweight='bold')


In [0]:
# Subgr√°fico 3: Rangos (min-max)
plt.subplot(1, 3, 3)
x = range(len(pdf_summary))
plt.bar(x, pdf_summary['monto_maximo'], color=colors, alpha=0.6, label='M√°ximo')
plt.bar(x, pdf_summary['monto_minimo'], color='gray', alpha=0.8, label='M√≠nimo')
plt.xticks(x, pdf_summary['Status'])
plt.title('Rango de Montos', fontsize=14, fontweight='bold')
plt.xlabel('Tipo de Transacci√≥n', fontsize=12)
plt.ylabel('Monto ($)', fontsize=12)
plt.legend()

plt.tight_layout()
plt.show()

In [0]:
print("\nüìä Gr√°fico 3: Transacciones por Categor√≠a de Monto")

# Pivot para mejor visualizaci√≥n
pdf_cat_pivot = pdf_categoria.pivot(
    index='MontoCategoria', 
    columns='Status', 
    values='cantidad_transacciones'
).fillna(0)

plt.figure(figsize=(12, 6))
pdf_cat_pivot.plot(kind='bar', color=['#ff6b6b', '#51cf66'], width=0.8)
plt.title('Distribuci√≥n de Transacciones por Categor√≠a de Monto', 
          fontsize=14, fontweight='bold')
plt.xlabel('Categor√≠a de Monto', fontsize=12)
plt.ylabel('N√∫mero de Transacciones', fontsize=12)
plt.legend(title='Tipo', fontsize=10)
plt.xticks(rotation=0)
plt.grid(axis='y', alpha=0.3)

plt.tight_layout()
plt.show()


In [0]:
print("\nüìä Gr√°fico 4: Patr√≥n Temporal de Transacciones")

# Separar por tipo
pdf_temp_fraude = pdf_temporal[pdf_temporal['Status'] == 'Fraude']
pdf_temp_legitima = pdf_temporal[pdf_temporal['Status'] == 'Leg√≠tima']

plt.figure(figsize=(14, 5))

# Subgr√°fico 1: N√∫mero de transacciones
plt.subplot(1, 2, 1)
plt.plot(pdf_temp_legitima['HoraAprox'], 
         pdf_temp_legitima['num_transacciones'], 
         marker='o', linewidth=2, color='#51cf66', label='Leg√≠timas')
plt.plot(pdf_temp_fraude['HoraAprox'], 
         pdf_temp_fraude['num_transacciones'], 
         marker='s', linewidth=2, color='#ff6b6b', label='Fraudes')
plt.title('N√∫mero de Transacciones por Hora', fontsize=14, fontweight='bold')
plt.xlabel('Hora del D√≠a', fontsize=12)
plt.ylabel('N√∫mero de Transacciones', fontsize=12)
plt.legend()
plt.grid(True, alpha=0.3)


In [0]:
# Subgr√°fico 2: Monto promedio
plt.subplot(1, 2, 2)
plt.plot(pdf_temp_legitima['HoraAprox'], 
         pdf_temp_legitima['monto_promedio'], 
         marker='o', linewidth=2, color='#51cf66', label='Leg√≠timas')
plt.plot(pdf_temp_fraude['HoraAprox'], 
         pdf_temp_fraude['monto_promedio'], 
         marker='s', linewidth=2, color='#ff6b6b', label='Fraudes')
plt.title('Monto Promedio por Hora', fontsize=14, fontweight='bold')
plt.xlabel('Hora del D√≠a', fontsize=12)
plt.ylabel('Monto Promedio ($)', fontsize=12)
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Conclusiones

In [0]:
# Extraer datos clave
row_fraude = pdf_summary[pdf_summary['Status'] == 'Fraude'].iloc[0]
row_legitima = pdf_summary[pdf_summary['Status'] == 'Leg√≠tima'].iloc[0]

print("\nüî¥ TRANSACCIONES FRAUDULENTAS:")
print(f"   ‚Ä¢ Total: {row_fraude['total_transacciones']:,} ({row_fraude['porcentaje_total']}% del total)")
print(f"   ‚Ä¢ Monto promedio: ${row_fraude['monto_promedio']:.2f}")
print(f"   ‚Ä¢ Monto total: ${row_fraude['monto_total']:,.2f}")
print(f"   ‚Ä¢ Rango: ${row_fraude['monto_minimo']:.2f} - ${row_fraude['monto_maximo']:.2f}")

print("\nüü¢ TRANSACCIONES LEG√çTIMAS:")
print(f"   ‚Ä¢ Total: {row_legitima['total_transacciones']:,} ({row_legitima['porcentaje_total']}% del total)")
print(f"   ‚Ä¢ Monto promedio: ${row_legitima['monto_promedio']:.2f}")
print(f"   ‚Ä¢ Monto total: ${row_legitima['monto_total']:,.2f}")
print(f"   ‚Ä¢ Rango: ${row_legitima['monto_minimo']:.2f} - ${row_legitima['monto_maximo']:.2f}")

print("\nüìä CONCLUSIONES PRINCIPALES:")
print(f"""
1. DESBALANCE DE CLASES:
   - Las transacciones fraudulentas representan solo el {row_fraude['porcentaje_total']}% del total
   - Esto confirma un dataset altamente desbalanceado, t√≠pico en detecci√≥n de fraudes
   
2. DIFERENCIAS EN MONTOS:
   - El monto promedio de fraudes (${row_fraude['monto_promedio']:.2f}) es {
       'mayor' if row_fraude['monto_promedio'] > row_legitima['monto_promedio'] else 'menor'
   } que el de transacciones leg√≠timas (${row_legitima['monto_promedio']:.2f})
   - Diferencia: ${abs(row_fraude['monto_promedio'] - row_legitima['monto_promedio']):.2f}
   
3. DISTRIBUCI√ìN POR CATEGOR√çA:
   - La mayor√≠a de transacciones se concentran en montos medios
   - Las transacciones de alto valor requieren mayor monitoreo
   
4. PATRONES TEMPORALES:
   - Se observan variaciones en el volumen de transacciones a lo largo del d√≠a
   - Esto puede ayudar a identificar horarios de mayor riesgo
""")

print("\n" + "=" * 60)
print("‚úÖ AN√ÅLISIS DE CAPA GOLD COMPLETADO")
print("=" * 60)
print("\nüìÅ Tablas Delta Lake creadas:")
print("   ‚Ä¢ lakehouse_fraude.gold_fraude_summary")
print("   ‚Ä¢ lakehouse_fraude.gold_categoria_analisis")
print("   ‚Ä¢ lakehouse_fraude.gold_analisis_temporal")
print("   ‚Ä¢ lakehouse_fraude.gold_riesgo_estadisticas")
