# Pipeline para Inferencia Batch en Detección de Fraude

## **Descripción**
Este pipeline está diseñado para aplicar un modelo preentrenado de detección de fraudes sobre un conjunto de datos transaccionales. Aprovecha PySpark y MLlib para cargar el modelo, generar predicciones y procesar los resultados de manera eficiente. El flujo se ejecuta en **Databricks Community Edition**, aprovechando su entorno escalable.

---

## **Objetivo**
Aplicar un modelo predictivo para clasificar transacciones de tarjetas de crédito como fraudulentas o no fraudulentas, priorizando aquellas de alto riesgo basándose en las probabilidades calculadas.

---

## **Configuraciones Requeridas**
### **Rutas de Entrada y Salida**
- **Datos de entrada:** Tabla Delta con características procesadas en `/FileStore/tables/output_delta_table_datapipe_feature_eng_to_inf`.
- **Modelo preentrenado:** Ruta del modelo en `/dbfs/tmp/fraud_detection_cv_model`.
- **Resultados de inferencia:** CSV con transacciones clasificadas en `/FileStore/tables/scored_transactions_results_batch`.
- **Etiquetas verdaderas procesadas:** CSV con etiquetas convertidas en `/FileStore/tables/etiquetas_verdaderas_processed`.

---

## **Notas**
- Este pipeline está optimizado para manejar grandes volúmenes de datos mediante procesamiento distribuido.
- Las rutas configuradas deben ajustarse según el entorno en que se ejecute.
- Los resultados permiten priorizar transacciones para investigaciones adicionales y detección temprana de fraudes.


In [0]:
!pip install mlflow
!pip install pytest

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-869b5e00-d5b8-446b-936e-d2e934fbed84/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-869b5e00-d5b8-446b-936e-d2e934fbed84/bin/python -m pip install --upgrade pip' command.[0m


# Just for explore

In [0]:
from pyspark.sql import SparkSession

# Crear una sesión de Spark
spark = SparkSession.builder \
    .appName("Read Delta Table") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Ruta del archivo Delta
delta_path = "/FileStore/tables/output_delta_table_datapipe_feature_eng_to_inf"

# Leer el archivo Delta en un DataFrame
df = spark.read.format("delta").load(delta_path)

# Mostrar las primeras filas del DataFrame
df.show(20, truncate=False)

+--------------+-------------+-----------------------+------------------+-----------------+----------------+------------+--------+-----------------+--------------------+------------------+-----------------------+-----------+-----------+
|transaction_id|customer_id  |timestamp              |amount            |merchant_category|merchant_country|card_present|is_fraud|timestamp_seconds|transaction_velocity|amount_velocity   |merchant_category_count|hour_of_day|day_of_week|
+--------------+-------------+-----------------------+------------------+-----------------+----------------+------------+--------+-----------------+--------------------+------------------+-----------------------+-----------+-----------+
|977424602cfd  |CUST_00000001|2024-08-19 09:48:07.744|193.68567404382148|entertainment    |US              |false       |false   |1724060887       |1                   |193.68567404382148|1                      |9          |2          |
|14538cde185f  |CUST_00000001|2024-10-03 07:05:48.7 

In [0]:
df.describe().show()

+-------+--------------+-------------+------------------+-----------------+----------------+-------------------+--------------------+------------------+-----------------------+------------------+------------------+
|summary|transaction_id|  customer_id|            amount|merchant_category|merchant_country|  timestamp_seconds|transaction_velocity|   amount_velocity|merchant_category_count|       hour_of_day|       day_of_week|
+-------+--------------+-------------+------------------+-----------------+----------------+-------------------+--------------------+------------------+-----------------------+------------------+------------------+
|  count|        199782|       199782|            199782|           199782|          199782|             199782|              199782|            199782|                 199782|            199782|            199782|
|   mean|      Infinity|         null|122.62476445443475|             null|            null|1.728391050563289E9|  1.1507743440349982|140.870

In [0]:
df.printSchema

Out[79]: <bound method DataFrame.printSchema of DataFrame[transaction_id: string, customer_id: string, timestamp: timestamp, amount: double, merchant_category: string, merchant_country: string, card_present: boolean, is_fraud: boolean, timestamp_seconds: bigint, transaction_velocity: bigint, amount_velocity: double, merchant_category_count: bigint, hour_of_day: int, day_of_week: int]>

# 1. Configuración Inicial: Logging y Sesión de Spark

In [0]:
import logging
from pyspark.sql import SparkSession

# Configuración del logger
def setup_logging():
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
    return logging.getLogger(__name__)

logger = setup_logging()

# Crear una sesión de Spark
def create_spark_session(app_name="Batch Inference Pipeline"):
    try:
        spark = SparkSession.builder \
            .appName(app_name) \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .getOrCreate()
        logger.info("Spark session created successfully.")
        return spark
    except Exception as e:
        logger.error("Failed to create Spark session: %s", str(e))
        raise


## Load Model

### **Propósito:**
Cargar un modelo de regresión logística previamente entrenado desde una ruta especificada.

### **Proceso:**
1. Se utiliza la función `LogisticRegressionModel.load()` para cargar el modelo desde la ubicación proporcionada.
2. Se verifica que la carga sea exitosa y se registra en el logger.

### **Output:**
Un objeto `LogisticRegressionModel` listo para su uso en tareas de inferencia.

In [0]:
from pyspark.ml.classification import LogisticRegressionModel

# Cargar el modelo entrenado
def load_model(model_path):
    try:
        logger.info("Loading model from: %s", model_path)
        model = LogisticRegressionModel.load(model_path)
        logger.info("Model loaded successfully.")
        return model
    except Exception as e:
        logger.error("Failed to load model: %s", str(e))
        raise


## Load Data

### **Propósito:**
Cargar datos desde un archivo en formato Delta para realizar tareas de inferencia.

### **Proceso:**
1. Se utiliza el método `spark.read.format("delta").load()` para leer los datos desde la ruta especificada.
2. Se registra en el logger la cantidad de registros cargados para verificar la operación.

### **Output:**
Un DataFrame de PySpark con los datos cargados desde el archivo Delta, listo para procesar.


In [0]:
# Cargar nuevos datos para inferencia
def load_data(spark, data_path):
    try:
        logger.info("Loading data from: %s", data_path)
        df = spark.read.format("delta").load(data_path)
        logger.info("Data loaded successfully. Record count: %d", df.count())
        return df
    except Exception as e:
        logger.error("Failed to load data: %s", str(e))
        raise


## Create Features Column

### **Propósito:**
Generar una columna llamada `features` en el DataFrame que combine las columnas seleccionadas para ser utilizadas como entrada en un modelo de machine learning.

### **Proceso:**
1. Se inicializa un `VectorAssembler` con las columnas de entrada especificadas en `feature_columns`.
2. Se aplica el ensamblador al DataFrame, generando una nueva columna llamada `features` (o el nombre especificado en `output_column`).
3. Se registra en el logger la creación exitosa de la columna.

### **Output:**
Un DataFrame de PySpark que incluye la columna adicional `features`, lista para ser utilizada en el entrenamiento o inferencia del modelo.


In [0]:
from pyspark.ml.feature import VectorAssembler

# Crear la columna 'features' en el dataset
def create_features_column(df, feature_columns, output_column="features"):
    try:
        logger.info("Creating features column...")
        assembler = VectorAssembler(inputCols=feature_columns, outputCol=output_column)
        df = assembler.transform(df)
        logger.info("Features column created successfully.")
        return df
    except Exception as e:
        logger.error("Error creating features column: %s", str(e))
        raise

## Run Batch Inference

### **Propósito:**
Realizar inferencia en lote utilizando un modelo previamente entrenado para generar predicciones sobre un conjunto de datos de entrada.

### **Proceso:**
1. Se aplica el modelo a los datos de entrada para generar predicciones.
2. Se seleccionan columnas clave del DataFrame de predicciones:
   - **`transaction_id`**: Identificador único de la transacción.
   - **`customer_id`**: Identificador del cliente asociado a la transacción.
   - **`features`**: Vector de características utilizado como entrada para el modelo.
   - **`probability`**: Vector que contiene las probabilidades de pertenencia a cada clase.
   - **`prediction`**: Clase predicha por el modelo (fraude o no fraude).

### **Output:**
Un DataFrame de PySpark que contiene:
- **`transaction_id`**
- **`customer_id`**
- **`features`**
- **`probability`**
- **`prediction`**


In [0]:
# Realizar inferencia batch
def run_batch_inference(model, df):
    try:
        logger.info("Running batch inference...")
        predictions = model.transform(df)
        logger.info("Inference completed successfully.")
        return predictions.select(
            "transaction_id", "customer_id", "features", "probability", "prediction"
        )
    except Exception as e:
        logger.error("Error during batch inference: %s", str(e))
        raise



# 7. Pipeline Principal

Este es el flujo principal que llama a todas las funciones anteriores.

In [0]:
def main_pipeline():
    try:
        logger.info("Starting Batch Inference Pipeline...")

        # Configuraciones
        data_path = "/FileStore/tables/output_delta_table_datapipe_feature_eng_to_inf"
        model_path = "/dbfs/tmp/fraud_detection_cv_model"

        # Crear sesión de Spark
        spark = create_spark_session()

        # Cargar datos y modelo
        df = load_data(spark, data_path)
        model = load_model(model_path)

        # Crear la columna 'features' si es necesario
        feature_columns = [
            "transaction_velocity",
            "amount_velocity",
            "merchant_category_count",
            "hour_of_day",
            "day_of_week"
        ]
        df = create_features_column(df, feature_columns)

        # Realizar inferencia
        predictions = run_batch_inference(model, df)

        # Mostrar las primeras filas de las predicciones
        predictions.show(truncate=False)

        logger.info("Batch Inference Pipeline completed successfully.")
    except Exception as e:
        logger.error("Pipeline execution failed: %s", str(e))
        raise

if __name__ == "__main__":
    main_pipeline()

+--------------+-------------+-------------------------------------+----------------------------------------+----------+
|transaction_id|customer_id  |features                             |probability                             |prediction|
+--------------+-------------+-------------------------------------+----------------------------------------+----------+
|1043e1069d4c  |CUST_00000006|[1.0,30.36820419587061,1.0,14.0,5.0] |[0.22077749430310337,0.7792225056968967]|1.0       |
|2b77eb73c4c9  |CUST_00000006|[1.0,216.39150504865475,1.0,8.0,7.0] |[0.19487119565425035,0.8051288043457496]|1.0       |
|a3c86b26cd1d  |CUST_00000006|[1.0,171.68692778967915,1.0,21.0,1.0]|[0.18726024566166236,0.8127397543383377]|1.0       |
|5d2e2deaece5  |CUST_00000008|[1.0,123.98424772797799,1.0,17.0,4.0]|[0.2015828235604309,0.7984171764395691] |1.0       |
|ea8d101644b5  |CUST_00000009|[1.0,98.0001131822433,2.0,22.0,6.0]  |[0.48606146537566997,0.51393853462433]  |1.0       |
|0ccebeb21ab3  |CUST_00000009|[2

# Guardar los resultados en un csv

A consecuencia del los tipo de datos complejos, opté por pasarlo a un df de pandas, darle el formato apropiado para el analisis y luego guardarlo como un csv con pyspark 

In [0]:
# Contar los valores 0 y 1 en la columna 'prediction'
#predictions.groupBy("prediction").count().show()

In [0]:
logger.info("Starting Batch Inference Pipeline...")

# Configuraciones
data_path = "/FileStore/tables/output_delta_table_datapipe_feature_eng_to_inf"
model_path = "/dbfs/tmp/fraud_detection_cv_model"

# Crear sesión de Spark
spark = create_spark_session()

# Cargar datos y modelo
df = load_data(spark, data_path)
model = load_model(model_path)

# Crear la columna 'features' si es necesario
feature_columns = [
    "transaction_velocity",
    "amount_velocity",
    "merchant_category_count",
    "hour_of_day",
    "day_of_week"
]
df = create_features_column(df, feature_columns)

# Realizar inferencia
predictions = run_batch_inference(model, df)

# Mostrar las primeras filas de las predicciones
predictions.show(truncate=False)

+--------------+-------------+-------------------------------------+----------------------------------------+----------+
|transaction_id|customer_id  |features                             |probability                             |prediction|
+--------------+-------------+-------------------------------------+----------------------------------------+----------+
|1043e1069d4c  |CUST_00000006|[1.0,30.36820419587061,1.0,14.0,5.0] |[0.22077749430310337,0.7792225056968967]|1.0       |
|2b77eb73c4c9  |CUST_00000006|[1.0,216.39150504865475,1.0,8.0,7.0] |[0.19487119565425035,0.8051288043457496]|1.0       |
|a3c86b26cd1d  |CUST_00000006|[1.0,171.68692778967915,1.0,21.0,1.0]|[0.18726024566166236,0.8127397543383377]|1.0       |
|5d2e2deaece5  |CUST_00000008|[1.0,123.98424772797799,1.0,17.0,4.0]|[0.2015828235604309,0.7984171764395691] |1.0       |
|ea8d101644b5  |CUST_00000009|[1.0,98.0001131822433,2.0,22.0,6.0]  |[0.48606146537566997,0.51393853462433]  |1.0       |
|0ccebeb21ab3  |CUST_00000009|[2

In [0]:
predictions.printSchema

Out[88]: <bound method DataFrame.printSchema of DataFrame[transaction_id: string, customer_id: string, features: vector, probability: vector, prediction: double]>

In [0]:
# Convertir el DataFrame PySpark a Pandas
predictions = predictions.toPandas()

  Unable to convert the field features. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion to Arrow: VectorUDT()
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [0]:
# Separar probabilidades en columnas individuales
predictions['prob_0'] = predictions['probability'].apply(lambda x: x[0])  # Probabilidad de no fraude
predictions['prob_1'] = predictions['probability'].apply(lambda x: x[1])  # Probabilidad de fraude

# Seleccionar las columnas relevantes para el análisis
scored_transactions = predictions[['transaction_id', 'customer_id', 'prob_0', 'prob_1', 'prediction']]

# Ordenar por probabilidad de fraude (para priorizar transacciones de alto riesgo)
#scored_transactions = scored_transactions.sort_values(by='prob_1', ascending=False)

# Mostrar las primeras filas del DataFrame procesado
print(scored_transactions.head())

  transaction_id    customer_id    prob_0    prob_1  prediction
0   1043e1069d4c  CUST_00000006  0.220777  0.779223         1.0
1   2b77eb73c4c9  CUST_00000006  0.194871  0.805129         1.0
2   a3c86b26cd1d  CUST_00000006  0.187260  0.812740         1.0
3   5d2e2deaece5  CUST_00000008  0.201583  0.798417         1.0
4   ea8d101644b5  CUST_00000009  0.486061  0.513939         1.0


In [0]:
# Ver el esquema del DataFrame en Pandas
scored_transactions.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 199782 entries, 0 to 199781
Data columns (total 5 columns):
 #   Column          Non-Null Count   Dtype  
---  ------          --------------   -----  
 0   transaction_id  199782 non-null  object 
 1   customer_id     199782 non-null  object 
 2   prob_0          199782 non-null  float64
 3   prob_1          199782 non-null  float64
 4   prediction      199782 non-null  float64
dtypes: float64(3), object(2)
memory usage: 7.6+ MB


In [0]:
# 1. Convertir el DataFrame de Pandas a PySpark
scored_transactions_spark = spark.createDataFrame(scored_transactions)

# 2. Guardar el DataFrame en formato CSV en la ruta de Databricks
output_path = "/FileStore/tables/scored_transactions_results_batch"

scored_transactions_spark.write \
    .format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save(output_path)

print(f"Scored transactions saved successfully to {output_path}")

Scored transactions saved successfully to /FileStore/tables/scored_transactions_results_batch


# Leer, formatear y guardas las etiquetas de los verdaderos positivos

In [0]:
# Ruta del archivo en formato Delta
file_path = "/FileStore/tables/output_delta_table_datapipe_feature_eng_to_inf"

# Leer el archivo en formato Delta
ground_truth = spark.read.format("delta").load(file_path)

# Seleccionar las columnas 'transaction_id' y 'is_fraud'
selected_columns = ground_truth.select("transaction_id", "is_fraud")

# Mostrar las primeras 10 filas
selected_columns.show(10, truncate=False)


+--------------+--------+
|transaction_id|is_fraud|
+--------------+--------+
|977424602cfd  |false   |
|14538cde185f  |false   |
|3ff032257517  |false   |
|78834b5e5ad9  |false   |
|4094d070173f  |false   |
|8f6180810166  |false   |
|e14bb5d34230  |false   |
|89809f754c84  |false   |
|180594f7ec4e  |false   |
|af9cb8cdd1c0  |false   |
+--------------+--------+
only showing top 10 rows



In [0]:
from pyspark.sql.functions import when, col

# Convertir 'is_fraud' de true/false a 1.0/0.0
ground_truth = ground_truth.withColumn(
    "is_fraud",
    when(col("is_fraud") == "true", 1.0).when(col("is_fraud") == "false", 0.0).otherwise(None)
)

# Mostrar las primeras filas para verificar la conversión
ground_truth.select("transaction_id", "is_fraud").show(10, truncate=False)

+--------------+--------+
|transaction_id|is_fraud|
+--------------+--------+
|977424602cfd  |0.0     |
|14538cde185f  |0.0     |
|3ff032257517  |0.0     |
|78834b5e5ad9  |0.0     |
|4094d070173f  |0.0     |
|8f6180810166  |0.0     |
|e14bb5d34230  |0.0     |
|89809f754c84  |0.0     |
|180594f7ec4e  |0.0     |
|af9cb8cdd1c0  |0.0     |
+--------------+--------+
only showing top 10 rows



In [0]:
# Guardar el DataFrame procesado en un archivo CSV
output_path = "/FileStore/tables/etiquetas_verdaderas_processed"

# Guardar el DataFrame con las etiquetas convertidas
ground_truth.select("transaction_id", "is_fraud") \
    .write \
    .format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save(output_path)

print(f"Processed ground truth saved successfully to {output_path}")

Processed ground truth saved successfully to /FileStore/tables/etiquetas_verdaderas_processed


## Descripción de las Pruebas

### **`test_load_model`**
- **Propósito:**  
  Valida que la función puede cargar correctamente un modelo guardado en la ruta especificada.

---

### **`test_load_data`**
- **Propósito:**  
  Comprueba que la función puede cargar datos desde una tabla Delta.
- **Validaciones:**  
  - El DataFrame devuelto tiene el esquema esperado.  
  - El número de registros coincide con el esperado.

---

### **`test_create_features_column`**
- **Propósito:**  
  Asegura que la columna `features` se genere correctamente a partir de las columnas de entrada.

---

### **`test_run_batch_inference`**
- **Propósito:**  
  Valida que la función puede ejecutar inferencias batch con un modelo de regresión logística.
- **Validaciones:**  
  - El DataFrame resultante contiene las columnas `prediction` y `probability`.


**Nota:** Estas pruebas unitarias, debido a limitaciones de tiempo y a la imposibilidad de ejecutarlas directamente en el mismo notebook, se dejaron de forma hipotética (no las ejecuté como tal, pero aún asi las hice por el requerimiento). Según la metodología de pytest, estas pruebas se ejecutan en archivos `.py`. Por esta razón, decidí priorizar otros pasos del proceso.

In [0]:
import pytest

@pytest.fixture(scope="session")
def spark():
    return SparkSession.builder \
        .appName("UnitTest") \
        .master("local[*]") \
        .getOrCreate()

@pytest.fixture
def sample_data(spark):
    schema = StructType([
        StructField("transaction_id", StringType(), True),
        StructField("customer_id", StringType(), True),
        StructField("features", StringType(), True),
        StructField("is_fraud", DoubleType(), True)
    ])
    data = [
        ("T1", "C1", "[1.0, 2.0, 3.0]", 1.0),
        ("T2", "C2", "[2.0, 3.0, 4.0]", 0.0),
        ("T3", "C3", "[3.0, 4.0, 5.0]", 1.0),
    ]
    return spark.createDataFrame(data, schema)

def test_load_model(tmp_path):
    model_path = tmp_path / "model"
    model = LogisticRegressionModel(uid="logistic_regression")
    model.write().overwrite().save(str(model_path))
    loaded_model = load_model(str(model_path))
    assert isinstance(loaded_model, LogisticRegressionModel)

def test_load_data(spark, tmp_path):
    data_path = tmp_path / "data"
    data = [(1, "C1"), (2, "C2")]
    schema = StructType([
        StructField("transaction_id", StringType(), True),
        StructField("customer_id", StringType(), True)
    ])
    df = spark.createDataFrame(data, schema)
    df.write.format("delta").save(str(data_path))
    loaded_df = load_data(spark, str(data_path))
    assert isinstance(loaded_df, DataFrame)
    assert loaded_df.count() == 2

def test_create_features_column(sample_data):
    feature_columns = ["transaction_id", "customer_id"]
    df = create_features_column(sample_data, feature_columns)
    assert "features" in df.columns

def test_run_batch_inference(spark, sample_data):
    model = LogisticRegressionModel(uid="logistic_regression")
    model.setFeaturesCol("features").setLabelCol("is_fraud")
    model.fit(sample_data)
    predictions = run_batch_inference(model, sample_data)
    assert "prediction" in predictions.columns
    assert "probability" in predictions.columns
    assert predictions.count() == sample_data.count()
