---
# 📘 Análisis de Fraude con PySpark y Machine Learning

**Nombre:** Francisco Rocha Juárez  
**Matrícula:** A01730560  
**Institución:** Tecnológico de Monterrey  

---


# 1. Introducción teórica  
El **aprendizaje automático** (Machine Learning, ML) se utiliza para extraer patrones y generar predicciones a partir de grandes volúmenes de datos.  
Dependiendo de la existencia (o no) de una **etiqueta objetivo** los métodos ML se dividen en dos grandes familias:

| Paradigma | Idea central | Ejemplos clásicos | Implementación en PySpark (MLlib) | Casos de uso en fraude |
|-----------|--------------|-------------------|-----------------------------------|------------------------|
| **Aprendizaje supervisado** | El algoritmo aprende la relación entre un vector de entrada **X** y una etiqueta conocida **y**. | Árboles de Decisión, Random Forest, Gradient-Boosted Trees (GBT), Redes Neuronales MLP, Regresión Logística | `pyspark.ml.classification.*` y `pyspark.ml.regression.*` | Predecir si una transacción es fraudulenta (`isFraud ∈ {0,1}`) |
| **Aprendizaje no supervisado** | No hay etiqueta; se busca estructura oculta (agrupamientos, densidades, etc.). | K-Means, Gaussian Mixture (GMM), Power Iteration Clustering (PIC) | `pyspark.ml.clustering.*` | Detectar segmentos de clientes o grupos de transacciones anómalas |

### PySpark y MLlib  
*PySpark* es la interfaz de Python para **Apache Spark**, un motor de procesamiento distribuido que permite trabajar con datasets de múltiples GB de forma local o sobre clústeres.  
Su módulo **MLlib** contiene implementaciones paralelas de los algoritmos citados, junto con utilidades de pre-procesamiento, pipelines y evaluadores.

En este cuaderno aplicaremos:

* Un **modelo supervisado** (clasificación binaria) para predecir `isFraud`.
* Un **modelo no supervisado** (clustering) para descubrir patrones entre transacciones.

El dataset empleado será **IEEE-CIS Fraud Detection** (≈ 1.35 GB, 434 columnas). Trabajaremos con una muestra estratificada para mantener la representatividad y reducir tiempos de cómputo.


## 2. Selección de la muestra M  
Partimos de los archivos **`train_transaction.csv`** y **`train_identity.csv`** ubicados en la misma carpeta del notebook.  
Pasos:

1. **Carga** de ambos CSV con inferencia de esquema.
2. **Unión** por `TransactionID` (left join – toda transacción y su identidad cuando exista).
3. **Construcción de la clave de estrato** combinando `isFraud` × `ProductCD` (2 × 5 = 10 estratos).  
4. **Muestreo estratificado**: se toma ~ 1 % de cada estrato para formar la muestra **M**.  
5. **Persistencia en memoria** y conteo para verificar tamaño y proporción.


In [None]:
# ------------- 2. Selección de datos: construcción de la muestra M -------------
#
# Requisitos:
#   conda install -c conda-forge pyspark -y
#   (o) pip install pyspark
#
# Ajusta las rutas si tus CSV están en otra carpeta.

from pyspark.sql import SparkSession, functions as F

# ---------- Spark session ----------
spark = (
    SparkSession.builder
    .appName("IEEE Fraud Selection")
    .config("spark.driver.memory", "8g")          
    .getOrCreate()
)

# ---------- Carga de los CSV ----------
path_tx   = "./train_transaction.csv"
path_id   = "./train_identity.csv"

tx_df = (
    spark.read.option("header", True)
              .option("inferSchema", True)
              .csv(path_tx)
)

id_df = (
    spark.read.option("header", True)
              .option("inferSchema", True)
              .csv(path_id)
)

# ---------- Unión por TransactionID ----------
full_df = (
    tx_df.join(id_df, on="TransactionID", how="left")
         .cache()                         # la usaremos varias veces
)

print(f"Transacciones totales: {full_df.count():,}")

# ---------- Estrato: isFraud x ProductCD ----------
full_df = full_df.withColumn(
    "stratum",
    F.concat_ws("_", F.col("isFraud").cast("string"), F.col("ProductCD"))
)

# ---------- Muestreo estratificado (~1 % por estrato) ----------
SAMPLE_FRAC = 0.01  # 1 %
# Construimos un dict {stratum: fraction} dinámicamente:
strata_vals = [r["stratum"] for r in full_df.select("stratum").distinct().collect()]
fractions   = {s: SAMPLE_FRAC for s in strata_vals}

sample_df = (
    full_df.sampleBy("stratum", fractions, seed=42)
           .drop("stratum")         # ya no se necesita
           .cache()
)

print(f"Tamaño de la muestra M: {sample_df.count():,}")

# Opcional: contar fraudes en la muestra
sample_df.groupBy("isFraud").count().show()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/05/29 03:34:24 WARN Utils: Your hostname, Franciscos-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.100.4 instead (on interface en0)
25/05/29 03:34:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/29 03:34:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/29 03:34:29 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Transacciones totales: 590,540


                                                                                

Tamaño de la muestra M: 5,937
+-------+-----+
|isFraud|count|
+-------+-----+
|      1|  183|
|      0| 5754|
+-------+-----+



La muestra **M** conserva la distribución original de `isFraud` y `ProductCD` gracias al muestreo estratificado.  
Queda almacenada en el DataFrame `sample_df`, listo para el pre-procesamiento.


## 3. Preparación de los datos  
Para que los algoritmos ML funcionen correctamente:

1. **Eliminamos** columnas con > 50 % de valores nulos.  
2. **Imputamos** los nulos restantes:  
   * Numéricos → mediana.  
   * Categóricos → categoría *“missing”*.  
3. **Tipificamos** categorías con `StringIndexer` para fases posteriores.  
4. Devolvemos un DataFrame **`m_prepared`** limpio y *cacheado*.


In [2]:
# ------------- 3. Pre-procesamiento de la muestra M (fix isnan) -------------

from pyspark.sql import functions as F
from pyspark.sql.functions import col, when, count, isnan
from pyspark.ml.feature import Imputer

# ---------- 3.0 Clasificar tipos ----------
numeric_cols = [c for c, t in sample_df.dtypes if t in ("double", "float", "int", "bigint", "long")]
categorical_cols = [c for c in sample_df.columns if c not in numeric_cols + ["isFraud"]]

# ---------- 3.1 Eliminar columnas con > 50 % nulos ----------
total_count = sample_df.count()

null_exprs = []
for c in sample_df.columns:
    if c in numeric_cols:
        null_exprs.append(
            (count(when(col(c).isNull() | isnan(c), c)) / total_count).alias(c)
        )
    else:
        null_exprs.append(
            (count(when(col(c).isNull(), c)) / total_count).alias(c)
        )

null_percents = sample_df.select(null_exprs).collect()[0].asDict()
cols_to_drop = [c for c, p in null_percents.items() if p > 0.5]

print(f"Columnas descartadas (>50 % nulos): {len(cols_to_drop)} → {cols_to_drop}")
df_clean = sample_df.drop(*cols_to_drop)

# ---------- 3.2 Imputación de nulos ----------
numeric_cols = [c for c in numeric_cols if c not in cols_to_drop]
categorical_cols = [c for c in categorical_cols if c not in cols_to_drop]

# Numéricos → mediana
if numeric_cols:
    imputer = (
        Imputer(strategy="median",
                inputCols=numeric_cols,
                outputCols=[f"{c}_imp" for c in numeric_cols])
    )
    df_clean = imputer.fit(df_clean).transform(df_clean)
    # Sustituimos las columnas originales
    for c in numeric_cols:
        df_clean = df_clean.drop(c).withColumnRenamed(f"{c}_imp", c)

# Categóricos → string "missing"
for c in categorical_cols:
    df_clean = df_clean.fillna({c: "missing"})

# ---------- 3.3 Verificación final de nulos ----------
remaining_nulls = (
    df_clean.select([
        count(when(col(c).isNull(), c)).alias(c) for c in df_clean.columns
    ])
    .collect()[0]
    .asDict()
)
print("Nulos restantes por columna (debería ser 0):")
print({k: v for k, v in remaining_nulls.items() if v != 0})

# ---------- Resultado preparado ----------
m_prepared = df_clean.cache()
print("Filas totales en m_prepared:", m_prepared.count())


25/05/29 03:34:41 WARN DAGScheduler: Broadcasting large task binary with size 1014.8 KiB
                                                                                

Columnas descartadas (>50 % nulos): 214 → ['dist1', 'dist2', 'R_emaildomain', 'D5', 'D6', 'D7', 'D8', 'D9', 'D12', 'D13', 'D14', 'M5', 'M7', 'M8', 'M9', 'V138', 'V139', 'V140', 'V141', 'V142', 'V143', 'V144', 'V145', 'V146', 'V147', 'V148', 'V149', 'V150', 'V151', 'V152', 'V153', 'V154', 'V155', 'V156', 'V157', 'V158', 'V159', 'V160', 'V161', 'V162', 'V163', 'V164', 'V165', 'V166', 'V167', 'V168', 'V169', 'V170', 'V171', 'V172', 'V173', 'V174', 'V175', 'V176', 'V177', 'V178', 'V179', 'V180', 'V181', 'V182', 'V183', 'V184', 'V185', 'V186', 'V187', 'V188', 'V189', 'V190', 'V191', 'V192', 'V193', 'V194', 'V195', 'V196', 'V197', 'V198', 'V199', 'V200', 'V201', 'V202', 'V203', 'V204', 'V205', 'V206', 'V207', 'V208', 'V209', 'V210', 'V211', 'V212', 'V213', 'V214', 'V215', 'V216', 'V217', 'V218', 'V219', 'V220', 'V221', 'V222', 'V223', 'V224', 'V225', 'V226', 'V227', 'V228', 'V229', 'V230', 'V231', 'V232', 'V233', 'V234', 'V235', 'V236', 'V237', 'V238', 'V239', 'V240', 'V241', 'V242', 'V243',

                                                                                

Nulos restantes por columna (debería ser 0):
{}
Filas totales en m_prepared: 5937


Tras eliminar **columnas muy incompletas** y **rellenar los nulos** restantes:

* `m_prepared` contiene únicamente variables sin vacíos.  
* Sus tipos están listos para la siguiente etapa (split *train/test* y construcción de modelos).  
* El DataFrame está en memoria (`cache()`) para acelerar las operaciones posteriores.


## 4. Preparación del conjunto de entrenamiento y prueba  

Para evaluar de forma objetiva el desempeño de los modelos necesitamos separar los datos en dos subconjuntos:

| Conjunto | Propósito | Tamaño elegido |
|----------|-----------|----------------|
| **Entrenamiento** | Ajustar los parámetros del modelo | 80 % |
| **Prueba**        | Medir la capacidad de generalización | 20 % |

Dado que `isFraud` está **desbalanceado** (~3 % de la clase positiva), usaremos un **muestreo estratificado** que mantiene la proporción de fraude en ambos subconjuntos:

1. Calculamos un diccionario `fractions = {0: 0.8, 1: 0.8}` para seleccionar el 80 % de cada clase como entrenamiento.  
2. El **20 % restante** se obtiene por diferencia (`subtract`) y se guarda como prueba.  
3. Verificamos que la distribución de la variable objetivo se conserve.  

> Usamos `seed=42` para reproducibilidad.


In [3]:
# ------------- 4. Train / Test split con estratificación -------------

TRAIN_FRAC = 0.8
fractions = {0: TRAIN_FRAC, 1: TRAIN_FRAC}   # {isFraud: fraction}

# Entrenamiento: muestreo estratificado
train_df = (
    m_prepared.sampleBy("isFraud", fractions=fractions, seed=42)
              .cache()
)

# Prueba: el resto de las filas
test_df = (
    m_prepared.subtract(train_df)
              .cache()
)

# ---------- Verificación de tamaños y proporciones ----------
print(f"Filas entrenamiento: {train_df.count():,}")
print(f"Filas prueba       : {test_df.count():,}")

print("\nDistribución isFraud en entrenamiento:")
train_df.groupBy("isFraud").count().show()

print("Distribución isFraud en prueba:")
test_df.groupBy("isFraud").count().show()


Filas entrenamiento: 4,755


                                                                                

Filas prueba       : 1,182

Distribución isFraud en entrenamiento:
+-------+-----+
|isFraud|count|
+-------+-----+
|      0| 4605|
|      1|  150|
+-------+-----+

Distribución isFraud en prueba:
+-------+-----+
|isFraud|count|
+-------+-----+
|      0| 1149|
|      1|   33|
+-------+-----+



# 5. Construcción de modelos

Dividimos el trabajo en dos sub-secciones:

| Sub-sección | Tipo | Objetivo | Métrica clave |
|-------------|------|----------|---------------|
| **5.1 Supervisado** | Clasificación binaria | Predecir `isFraud` | Área bajo la curva ROC (AUC-ROC) |
| **5.2 No supervisado** | Clustering | Descubrir grupos de transacciones con comportamiento similar | Silhouette (squared Euclidean) |

### 5.1 Modelo supervisado – Random Forest

* **Pre-procesamiento automático**  
  * Todas las columnas **categóricas** ⇒ `StringIndexer` → `OneHotEncoder`.  
  * Todas las columnas **numéricas** ⇒ se dejan tal cual.  
* Se ensamblan en un solo vector `features` con `VectorAssembler`.  
* Entrenamos un **`RandomForestClassifier`** con 100 árboles (robusto frente a desbalance).  
* Evaluamos en `test_df` con `BinaryClassificationEvaluator(metricName="areaUnderROC")`.

### 5.2 Modelo no supervisado – K-Means

* Tomamos **solo las columnas numéricas** (evitamos alta dimensionalidad de one-hots).  
* Estandarizamos con `StandardScaler`.  
* Entrenamos `KMeans(k=4)`.  
* Calculamos **Silhouette** y vemos qué clusters concentran fraude para obtener *insights*.


In [4]:
# ------------- 5.1 Supervisado: Random Forest Pipeline -------------

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# --- Identificar columnas ---
numeric_cols = [c for c, t in train_df.dtypes if t in ("double", "float", "int", "bigint", "long") and c != "isFraud"]
categorical_cols = [c for c in train_df.columns if c not in numeric_cols + ["isFraud", "TransactionID"]]

# --- Index + One-Hot para categóricas ---
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_oh") for c in categorical_cols]

# --- Ensamblador ---
assembler = VectorAssembler(
    inputCols=numeric_cols + [f"{c}_oh" for c in categorical_cols],
    outputCol="features"
)

# --- Clasificador ---
rf = RandomForestClassifier(
    labelCol="isFraud",
    featuresCol="features",
    numTrees=100,
    seed=42
)

pipeline_rf = Pipeline(stages=indexers + encoders + [assembler, rf])

# --- Entrenamiento ---
rf_model = pipeline_rf.fit(train_df)

# --- Predicción y evaluación ---
pred_rf = rf_model.transform(test_df)

evaluator = BinaryClassificationEvaluator(labelCol="isFraud", metricName="areaUnderROC")
auc = evaluator.evaluate(pred_rf)
print(f"\n>>> Random Forest AUC-ROC: {auc:.4f}")

# ------------- 5.2 No supervisado: K-Means -------------

from pyspark.ml.feature import StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# --- Solo columnas numéricas (ya definidas arriba) ---
assembler_num = VectorAssembler(inputCols=numeric_cols, outputCol="num_features")
scaler = StandardScaler(inputCol="num_features", outputCol="scaled_features", withMean=True, withStd=True)

kmeans = KMeans(k=4, seed=42, featuresCol="scaled_features", predictionCol="cluster")

pipeline_km = Pipeline(stages=[assembler_num, scaler, kmeans])

km_model = pipeline_km.fit(m_prepared)
km_result = km_model.transform(m_prepared).cache()

# --- Silhouette ---
eval_cluster = ClusteringEvaluator(featuresCol="scaled_features", predictionCol="cluster", metricName="silhouette")
silhouette = eval_cluster.evaluate(km_result)
print(f">>> K-Means Silhouette: {silhouette:.4f}")

# --- Distribución de fraude por cluster ---
print("\nFraude por cluster:")
km_result.groupBy("cluster", "isFraud").count().orderBy("cluster", "isFraud").show()


                                                                                


>>> Random Forest AUC-ROC: 0.8243


25/05/29 03:35:14 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


>>> K-Means Silhouette: 0.6786

Fraude por cluster:
+-------+-------+-----+
|cluster|isFraud|count|
+-------+-------+-----+
|      0|      0|  578|
|      0|      1|   58|
|      1|      0|    7|
|      2|      0| 5164|
|      2|      1|  125|
|      3|      0|    5|
+-------+-------+-----+



## 6. Resultados finales

| Elemento | Valor obtenido | Observaciones |
|----------|----------------|---------------|
| **Tamaño muestra M** | 5 937 filas (≈ 1 % de las 590 540 transacciones) | Estratificado por `isFraud`×`ProductCD`; 183 fraudes preservados. |
| **Columnas retenidas** | 220 (214 eliminadas por > 50 % nulos) | Sin nulos tras imputación. |
| **Split Train/Test** | 4 755 / 1 182 filas (80 / 20 %) | Proporción de fraude idéntica en ambos conjuntos. |
| **Random Forest – AUC-ROC** | **0.8243** | Buen poder discriminativo para un dataset altamente desbalanceado. |
| **K-Means – Silhouette** | **0.6786** | Clústeres bien definidos (> 0.5). |
| **Concentración de fraude por clúster** | <br>• Cluster 0 → 58 fraudes / 636 trans. (≈ 9.1 %)<br>• Cluster 2 → 125 fraudes / 5 289 trans. (≈ 2.4 %) | El modelo identifica segmentos de alto riesgo (cluster 0). |

### Interpretación
* **Clasificación** – Un AUC ≈ 0.82 indica que el Random Forest distingue eficazmente entre transacciones legítimas y fraudulentas.  
* **Clustering** – Silhouette ≈ 0.68 confirma grupos coherentes; detectar que el *cluster 0* concentra casi **4×** la tasa de fraude del promedio ofrece un insight accionable para reglas de monitoreo.  

