In [0]:
# --- Fase 2: Carga de Datos ---
# (Inicio de 01_EDA_y_Limpieza)

# Nombre completo de la tabla que creaste en Unity Catalog
table_name = "workspace.default.creditcard"

# Cargamos los datos en un Spark DataFrame
try:
    df = spark.read.table(table_name)
    
    # Mostremos las primeras 5 filas para verificar
    print(f"Datos cargados exitosamente desde la tabla: {table_name}")
    df.show(5)

    # Imprimamos el esquema que Databricks infirió
    print("Esquema (Schema) del DataFrame:")
    df.printSchema()

except Exception as e:
    print(f"Error al leer la tabla: {e}")
    print("Verifica que el nombre del catálogo y la tabla ('workspace.default.creditcard') sean correctos.")

Datos cargados exitosamente desde la tabla: workspace.default.creditcard
+--------+-------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-----------------+--------------------+-------------------+--------------------+------------------+--------------------+------------------+------------------+------------------+-------------------+-------------------+------+-----+
|    Time|                 V1|                V2|                V3|                V4|                 V5|                V6|                V7|                 V8|                V9|               V10|               V11|               V12|               V13|               V14|               V15|                V16|               V17|              V18|    

In [0]:
from pyspark.sql.functions import col, count

# 1. Agrupamos por la columna 'Class' y contamos las ocurrencias
print("Distribución de Clases (0 = Legítima, 1 = Fraude):")
class_distribution = df.groupBy("Class").count()
class_distribution.show()

# 2. Calculamos los conteos totales y de fraude para el porcentaje
try:
    total_count = df.count()
    fraud_count = df.filter(col("Class") == 1).count()
    legit_count = df.filter(col("Class") == 0).count()

    # 3. Calculamos el porcentaje
    percent_fraud = (fraud_count / total_count) * 100

    print(f"\n--- Resumen del Desbalance ---")
    print(f"Total de transacciones: {total_count:,}")
    print(f"Transacciones legítimas (0): {legit_count:,}")
    print(f"Transacciones fraudulentas (1): {fraud_count}")
    print(f"\nPorcentaje de Fraude: {percent_fraud:.4f}%")

except Exception as e:
    print(f"Error al calcular el desbalance: {e}")
    print("Asegúrate de que la columna 'Class' exista en tu DataFrame.")

Distribución de Clases (0 = Legítima, 1 = Fraude):
+-----+------+
|Class| count|
+-----+------+
|    0|284315|
|    1|   492|
+-----+------+


--- Resumen del Desbalance ---
Total de transacciones: 284,807
Transacciones legítimas (0): 284,315
Transacciones fraudulentas (1): 492

Porcentaje de Fraude: 0.1727%


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col

# 1. Definir la lista de columnas de características.
# Excluimos 'Time' (débil) y 'Class' (nuestra etiqueta a predecir).
feature_columns = [c for c in df.columns if c not in ['Time', 'Class']]

# Imprimimos para verificar (debería ser V1...V28 y Amount)
print(f"Columnas de características a ensamblar: {feature_columns}")
print(f"Total: {len(feature_columns)} características") # Deberían ser 29

# 2. Configurar el VectorAssembler
# Entrada: Las 29 columnas
# Salida: Una nueva columna llamada "unscaled_features"
assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol="unscaled_features"
)

# 3. Configurar el StandardScaler
# Entrada: "unscaled_features"
# Salida: "features" (el nombre estándar que espera pyspark.ml)
scaler = StandardScaler(
    inputCol="unscaled_features",
    outputCol="features",
    withStd=True,  # Escalar a desviación estándar 1
    withMean=True  # Centrar datos en media 0
)

# 4. Definir el Pipeline
# Un pipeline encadena estas etapas en orden
preprocessing_pipeline = Pipeline(stages=[assembler, scaler])

# 5. "Entrenar" el pipeline
# El pipeline "aprende" las estadísticas (media, std) de los datos
pipeline_model = preprocessing_pipeline.fit(df)

# 6. Transformar los datos
# Aplicamos la transformación a nuestro DataFrame
df_processed = pipeline_model.transform(df)

# 7. Seleccionamos solo las columnas que necesitamos para el modelado
# 'features' (el vector escalado) y 'Class' (la etiqueta)
df_final = df_processed.select("features", "Class")

print("\n--- ¡Preprocesamiento Completo! ---")
print("Datos listos para el modelado (columna 'features' y 'Class'):")
df_final.show(5, truncate=False)

Columnas de características a ensamblar: ['V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount']
Total: 29 características

--- ¡Preprocesamiento Completo! ---
Datos listos para el modelado (columna 'features' y 'Class'):
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                     

In [0]:
# --- Fase 3: División de Datos y Prevención de Fuga ---

# 1. Definir las fracciones de muestreo para la división estratificada
# Queremos 80% para entrenamiento y 20% para prueba
fractions = {
    0: 0.8,  # 80% de la clase 0 (legítima)
    1: 0.8   # 80% de la clase 1 (fraude)
}

# 2. Crear el conjunto de entrenamiento (train_data)
# Usamos sampleBy para tomar el 80% de cada clase
train_data = df_final.sampleBy("Class", fractions, seed=42)

# 3. Crear el conjunto de prueba (test_data)
# Usamos subtract() para obtener todo lo que NO estaba en train_data
test_data = df_final.subtract(train_data)

# 4. Verificar la división
print("--- División Estratificada Completa ---")
total_count = df_final.count()
train_count = train_data.count()
test_count = test_data.count()

print(f"Total original: {total_count}")
print(f"Total en Entrenamiento (Train): {train_count} (~{train_count/total_count:.2%})")
print(f"Total en Prueba (Test): {test_count} (~{test_count/total_count:.2%})")

# 5. Verificación más importante: el desbalance en el Test Set
print("\n--- Verificación del Test Set (¡Debe estar desbalanceado!) ---")
test_data.groupBy("Class").count().show()

print("\n--- Verificación del Train Set (¡Aún desbalanceado!) ---")
train_data.groupBy("Class").count().show()

--- División Estratificada Completa ---
Total original: 284807
Total en Entrenamiento (Train): 227813 (~79.99%)
Total en Prueba (Test): 54319 (~19.07%)

--- Verificación del Test Set (¡Debe estar desbalanceado!) ---
+-----+-----+
|Class|count|
+-----+-----+
|    0|54237|
|    1|   82|
+-----+-----+


--- Verificación del Train Set (¡Aún desbalanceado!) ---
+-----+------+
|Class| count|
+-----+------+
|    0|227409|
|    1|   404|
+-----+------+



In [0]:
# --- Fase 3: Aplicación de SMOTE (Solo en Train) ---

# 1. Instalar la biblioteca (requerido en Databricks)
# La celda puede tardar un momento en ejecutar esto
%pip install imbalanced-learn

# 2. Importar bibliotecas necesarias
from imblearn.over_sampling import SMOTE
import numpy as np
import pandas as pd
from pyspark.ml.linalg import Vectors

print("\nIniciando el proceso de SMOTE...")
print("Paso 1/5: Convirtiendo el 'train_data' de Spark a Pandas...")

# 3. Exportar a Pandas (¡El cuello de botella!)
try:
    train_pd = train_data.toPandas()
except Exception as e:
    print(f"Error al convertir a Pandas: {e}")
    print("Esto puede fallar si el driver node no tiene suficiente memoria.")
    raise e

print(f"Paso 2/5: Datos convertidos. {len(train_pd)} filas en Pandas.")

# 4. Preparar los datos para SMOTE (formato NumPy)
# 'y' son las etiquetas
y_train_pd = train_pd['Class']

# 'X' son las características. Debemos convertir la columna 'features'
# (que contiene Vectores de Spark) a un array 2D de NumPy.
X_train_pd = np.stack(train_pd['features'].apply(lambda x: x.toArray()))

print(f"Paso 3/5: Datos de 'features' (X) y 'Class' (y) separados.")
print(f"Forma de X_train (antes de SMOTE): {X_train_pd.shape}")
print(f"Forma de y_train (antes de SMOTE): {y_train_pd.shape}")

# 5. Configurar y aplicar SMOTE
# Usamos k_neighbors=5 (default). Esto está bien porque tenemos > 5 fraudes.
smote = SMOTE(random_state=42)
print("Paso 4/5: Aplicando SMOTE... (Esto puede tardar un momento)")
X_resampled, y_resampled = smote.fit_resample(X_train_pd, y_train_pd)

print(f"Paso 5/5: ¡SMOTE completado!")
print(f"--- Verificación Post-SMOTE (en NumPy) ---")
print(f"Forma de X (balanceado): {X_resampled.shape}")
print(f"Conteo de clases en 'y' (balanceado):")
print(pd.Series(y_resampled).value_counts())

# 6. Convertir los datos balanceados (NumPy) de nuevo a un Spark DataFrame
print("\nConvirtiendo datos balanceados de vuelta a Spark DataFrame...")

# Creamos una lista de tuplas (Vectors.dense(features), Class)
data_tuples = [
    (Vectors.dense(row), int(y)) for row, y in zip(X_resampled, y_resampled)
]

# Creamos el DataFrame 'train_balanced_data'
train_balanced_data = spark.createDataFrame(
    data_tuples, 
    ["features", "Class"] # Mantenemos los nombres de columna
)

print("\n--- ¡Conversión a Spark DF Completa! ---")
print("Verificación del DataFrame 'train_balanced_data' (en Spark):")
train_balanced_data.groupBy("Class").count().show()

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m

Iniciando el proceso de SMOTE...
Paso 1/5: Convirtiendo el 'train_data' de Spark a Pandas...
Paso 2/5: Datos convertidos. 227813 filas en Pandas.
Paso 3/5: Datos de 'features' (X) y 'Class' (y) separados.
Forma de X_train (antes de SMOTE): (227813, 29)
Forma de y_train (antes de SMOTE): (227813,)
Paso 4/5: Aplicando SMOTE... (Esto puede tardar un momento)
Paso 5/5: ¡SMOTE completado!
--- Verificación Post-SMOTE (en NumPy) ---
Forma de X (balanceado): (454818, 29)
Conteo de clases en 'y' (balanceado):
Class
0    227409
1    227409
Name: count, dtype: int64

Convirtiendo datos balanceados de vuelta a Spark DataFrame...

--- ¡Conversión a Spark DF Completa! ---
Verificación del DataFrame 'train_balanced_data' (en Spark):
+-----+------+
|Class| count|
+-----+------+
|    0|227409|
|    1|227409|
+-----+------+



In [0]:
# --- Fase 4: Entrenamiento de Modelos y Seguimiento ---

import mlflow
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# --- 1. Configurar los Evaluadores ---
# Debemos decirle a Spark CÓMO medir el rendimiento.

# Evaluador principal: AUC-PR (perfecto para desbalance)
# Usamos BinaryClassificationEvaluator para esto.
evaluator_pr = BinaryClassificationEvaluator(
    labelCol="Class",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderPR"  # ¡La métrica clave!
)

# Evaluadores secundarios: Precision y Recall
# Usamos MulticlassClassificationEvaluator (funciona para binario)
evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="Class",
    predictionCol="prediction",
    metricName="precisionByLabel",
    metricLabel=1 # Nos importa la precisión de la clase 1 (fraude)
)

evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="Class",
    predictionCol="prediction",
    metricName="recallByLabel",
    metricLabel=1 # Nos importa el recall de la clase 1 (fraude)
)

print("Evaluadores listos (AUC-PR, Precision, Recall).")

# --- 2. Iniciar el Experimento de MLflow ---
# Nombramos el experimento para agrupar todas nuestras ejecuciones
mlflow.set_experiment("/fraud_detection_project")

# --- 3. Ejecución del Modelo: Regresión Logística ---
print("Iniciando ejecución (Run) de Regresión Logística...")

try:
    # Usamos 'with' para que MLflow inicie y termine el registro automáticamente
    with mlflow.start_run(run_name="Logistic Regression"):

        # a. Definir el modelo y los parámetros
        lr = LogisticRegression(featuresCol="features", labelCol="Class", regParam=0.1)
        
        # b. Registrar Parámetros
        mlflow.log_param("model_type", "LogisticRegression")
        mlflow.log_param("regParam", 0.1)
        mlflow.log_param("notes", "Baseline model with SMOTE data")

        # c. Entrenar el modelo
        # ¡Usamos los datos balanceados por SMOTE!
        model = lr.fit(train_balanced_data)

        # d. Generar Predicciones
        # ¡Evaluamos en los datos de prueba desbalanceados!
        predictions = model.transform(test_data)
        
        # e. Calcular y Registrar Métricas
        auc_pr = evaluator_pr.evaluate(predictions)
        precision = evaluator_precision.evaluate(predictions)
        recall = evaluator_recall.evaluate(predictions)
        
        mlflow.log_metric("AUC_PR", auc_pr)
        mlflow.log_metric("Precision", precision)
        mlflow.log_metric("Recall", recall)

        # f. Registrar el Modelo
        mlflow.spark.log_model(
            model, 
            "model", 
            dfs_tmpdir="/Volumes/workspace/default/mlflow_staging"
        )
        
        print("\n--- ¡Ejecución de Regresión Logística Completa! ---")
        print(f"  Resultados (en test_data):")
        print(f"  AUC-PR: {auc_pr:.4f}")
        print(f"  Precision (Clase 1): {precision:.4f}")
        print(f"  Recall (Clase 1): {recall:.4f}")

except Exception as e:
    print(f"Error durante la ejecución de MLflow: {e}")

Evaluadores listos (AUC-PR, Precision, Recall).
Iniciando ejecución (Run) de Regresión Logística...





--- ¡Ejecución de Regresión Logística Completa! ---
  Resultados (en test_data):
  AUC-PR: 0.6320
  Precision (Clase 1): 0.4012
  Recall (Clase 1): 0.8171


In [0]:
# --- Fase 4: Ejecución del Modelo 2 (Random Forest - CORREGIDO) ---

from pyspark.ml.classification import RandomForestClassifier
import pandas as pd
import numpy as np

print("Iniciando ejecución (Run) de Random Forest...")

try:
    with mlflow.start_run(run_name="Random Forest"):

        # a. Definir el modelo y los parámetros
        rf = RandomForestClassifier(
            featuresCol="features", 
            labelCol="Class",
            numTrees=100,
            maxDepth=5,
            seed=42
        )
        
        # b. Registrar Parámetros
        mlflow.log_param("model_type", "RandomForestClassifier")
        mlflow.log_param("numTrees", 100)
        mlflow.log_param("maxDepth", 5)
        mlflow.log_param("notes", "Modelo de ensamble con SMOTE data")

        # c. Entrenar el modelo
        model = rf.fit(train_balanced_data)

        # d. Generar Predicciones
        predictions = model.transform(test_data)
        
        # e. Calcular y Registrar Métricas
        auc_pr = evaluator_pr.evaluate(predictions)
        precision = evaluator_precision.evaluate(predictions)
        recall = evaluator_recall.evaluate(predictions)
        
        mlflow.log_metric("AUC_PR", auc_pr)
        mlflow.log_metric("Precision", precision)
        mlflow.log_metric("Recall", recall)
        
        # f. Registrar el Modelo (con Firma y el tmpdir)
        mlflow.spark.log_model(
            model, 
            "model", 
            dfs_tmpdir="/Volumes/workspace/default/mlflow_staging",
        )
        
        print("\n--- ¡Ejecución de Random Forest Completa! ---")
        print(f"  Resultados (en test_data):")
        print(f"  AUC-PR: {auc_pr:.4f}")
        print(f"  Precision (Clase 1): {precision:.4f}")
        print(f"  Recall (Clase 1): {recall:.4f}")

except Exception as e:
    print(f"Error durante la ejecución de MLflow: {e}")

Iniciando ejecución (Run) de Random Forest...





--- ¡Ejecución de Random Forest Completa! ---
  Resultados (en test_data):
  AUC-PR: 0.6932
  Precision (Clase 1): 0.2632
  Recall (Clase 1): 0.8537
