## Dependencias

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

from glob import glob 
from functools import reduce 

# Crear la sesión de Spark
spark = SparkSession.builder.appName("ML Pipeline").getOrCreate()

## Lectura de datos 

In [None]:
cols = ['User', 'Year', 'Month','Use Chip','Amount','Merchant State','Is Fraud?']
names = ['id_user','year','month','txn_type','amount','state','is_fraud']
df = spark.read.csv('work/data/txn*.csv',inferSchema=False,header=True).select(*cols).toDF(*names) # asume que estamos usando el kernel del compose, por eso hay que agregar el work/ al path
df.printSchema()

### Filtros y calculos 

In [None]:
us_states = [
    "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA",
    "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD",
    "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
    "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC",
    "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"
]
df = df.withColumn('month',F.col('month').cast('int'))
df = df.withColumn('year',F.col('year').cast('int'))
df = df.withColumn('month', df['year']*100+df['month']).drop('year')
df = df.filter(F.col('state').isin(us_states))
df = df.withColumn('is_fraud', F.when(F.col('is_fraud') == 'Yes', 1).otherwise(0))
df = df.withColumn('amount', F.regexp_replace('amount', '\\$', '').cast('double'))
df.printSchema()

## Ingeniería de características en retrospectiva para detectar patrones anómalos 

In [None]:
df.createOrReplaceTempView('transactions')

In [None]:
# proporción entre el promedio de los últimos 6 meses de transacciones por usuario y el monto de la transacción actual
query = """
WITH userMonthlyAgg AS (
    SELECT 
        id_user, 
        month, 
        SUM(amount) AS monto, 
        COUNT(*) AS num_txn  
    FROM transactions 
    GROUP BY id_user, month
),
userMonthlyAggWithWindow AS (
SELECT 
    id_user, 
    month,
    SUM(monto) OVER (
        PARTITION BY id_user 
        ORDER BY month 
        ROWS BETWEEN 6 PRECEDING AND 1 PRECEDING
    ) AS sum_last_6_months,
    SUM(num_txn) OVER (
        PARTITION BY id_user 
        ORDER BY month 
        ROWS BETWEEN 6 PRECEDING AND 1 PRECEDING
    ) AS num_txn_last_6_months,
    ROW_NUMBER() OVER (
        PARTITION BY id_user 
        ORDER BY month
    ) AS row_num,
    monto,
    num_txn
FROM userMonthlyAgg
ORDER BY id_user, month
)
select 
transactions.id_user,
transactions.month,
amount/(sum_last_6_months/num_txn_last_6_months) as c_ratio_amount_vs_avg_last_6_months,
amount as c_amount,
state as d_state,
txn_type as d_txn_type,
is_fraud
 from userMonthlyAggWithWindow
inner join transactions on userMonthlyAggWithWindow.id_user = transactions.id_user and userMonthlyAggWithWindow.month = transactions.month
where row_num > 6
;
"""
tad = spark.sql(query)


## Definición de variables 

In [None]:
varc = [v for v in tad.columns if v.startswith('c_')]
vard = [v for v in tad.columns if v.startswith('d_')]
len(varc), len(vard),varc,vard

## Partición 

In [None]:
train, valid = tad.randomSplit([0.8, 0.2], seed=42)

In [None]:
## Crear vectores de características
train.show(5)

## Preparar los datos para el modelo

In [None]:
indexers = [
    StringIndexer(inputCol="d_state", outputCol="d_state_index"),
    StringIndexer(inputCol="d_txn_type", outputCol="d_txn_type_index")
]

encoders = [
    OneHotEncoder(inputCol="d_state_index", outputCol="d_state_encoded"),
    OneHotEncoder(inputCol="d_txn_type_index", outputCol="d_txn_type_encoded")
]

feature_cols = ["c_ratio_amount_vs_avg_last_6_months", "c_amount", "d_state_encoded", "d_txn_type_encoded"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [None]:
log_reg = LogisticRegression(featuresCol="features", labelCol="is_fraud")


In [None]:
pipeline = Pipeline(stages=indexers + encoders + [assembler, log_reg])
model = pipeline.fit(train)

In [None]:
predictions = model.transform(valid)
predictions.select("id_user", "month", "features", "is_fraud", "prediction", "probability").show()


In [None]:
evaluator = BinaryClassificationEvaluator(labelCol="is_fraud", rawPredictionCol="prediction", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)
print(f"ROC-AUC Score: {roc_auc:.4f}")