# Análisis de `payments_gold` (bigdata_db)

Este notebook:

- Se conecta a **MariaDB** y lee datos desde `bigdata_db.payments_gold`.
- Hace **EDA** (calidad, distribuciones, series temporales, cortes por categorías).
- Entrena y evalúa **modelos baseline** (scikit-learn) para predecir `is_fraud`.
- Genera **insights accionables**.

> Recomendación: **no hardcodear credenciales**. Este notebook usa variables de entorno o pide el password por prompt.


In [None]:
# Si ejecutás esto en un entorno limpio, instalá dependencias (descomentá):
!pip install -U pandas numpy matplotlib seaborn sqlalchemy pymysql scikit-learn scipy

import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sqlalchemy import create_engine, text

from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.dummy import DummyClassifier
from sklearn.metrics import (
    roc_auc_score, average_precision_score,
    classification_report, confusion_matrix,
    RocCurveDisplay, PrecisionRecallDisplay
)

sns.set_theme(style='whitegrid')
plt.rcParams['figure.figsize'] = (10, 5)


## 1) Conexión a MariaDB

Define estas variables de entorno (recomendado):

- `MARIADB_HOST` (ej: `172.28.0.10`)
- `MARIADB_PORT` (ej: `3306`)
- `MARIADB_DB` (ej: `bigdata_db`)
- `MARIADB_USER` (ej: `bigdata_user`)
- `MARIADB_PASSWORD`

Si `MARIADB_PASSWORD` no está seteada, el notebook la pide por prompt.


In [None]:
from getpass import getpass

MARIADB_HOST = os.getenv('MARIADB_HOST', '172.28.0.10')
MARIADB_PORT = os.getenv('MARIADB_PORT', '3306')
MARIADB_DB   = os.getenv('MARIADB_DB', 'bigdata_db')
MARIADB_USER = os.getenv('MARIADB_USER', 'bigdata_user')
MARIADB_PASSWORD = os.getenv('MARIADB_PASSWORD')

if not MARIADB_PASSWORD:
    MARIADB_PASSWORD = getpass('MariaDB password: ')

# MariaDB suele funcionar perfecto con dialecto mysql + driver PyMySQL desde Python
engine = create_engine(
    f"mysql+pymysql://{MARIADB_USER}:{MARIADB_PASSWORD}@{MARIADB_HOST}:{MARIADB_PORT}/{MARIADB_DB}",
    pool_pre_ping=True,
)

with engine.connect() as conn:
    version = conn.execute(text('SELECT VERSION()')).scalar()

print('✅ Conectado. VERSION():', version)


## 2) Cargar datos desde `payments_gold`

Tips:
- Si la tabla es grande, empezá con un `LIMIT`.
- Para modelos/EDA robustos, podés traer una ventana temporal (ej. últimos N días).


In [None]:
TABLE = 'payments_gold'

# Ajustá esto según tamaño
LIMIT = int(os.getenv('PAYMENTS_GOLD_LIMIT', '200000'))  # 200k por defecto

with engine.connect() as conn:
    total_rows = conn.execute(text(f"SELECT COUNT(*) FROM {TABLE}")).scalar()

print('Total filas en la tabla:', total_rows)

query = f"SELECT * FROM {TABLE} ORDER BY event_ts DESC LIMIT {LIMIT}"

df = pd.read_sql(query, engine)
print('Filas cargadas:', len(df))
print('Columnas:', list(df.columns))

df.head(3)


## 3) Normalización rápida de tipos

Esperado (por tu pipeline Silver→Gold):
- `event_ts`: timestamp
- `amount`: numérico
- `is_fraud`: booleano (a veces viene como 0/1)


In [None]:
# event_ts
if 'event_ts' in df.columns:
    df['event_ts'] = pd.to_datetime(df['event_ts'], errors='coerce', utc=True)

# amount
if 'amount' in df.columns:
    df['amount'] = pd.to_numeric(df['amount'], errors='coerce')

# is_fraud
if 'is_fraud' in df.columns:
    # Convertir bytes a int si es necesario
    if df['is_fraud'].dtype == object and all(isinstance(x, bytes) for x in df['is_fraud']):
        df['is_fraud'] = df['is_fraud'].apply(lambda x: int.from_bytes(x, 'big'))
    
    # Convertir a booleano
    df['is_fraud'] = df['is_fraud'].astype('float').fillna(0).astype(int).astype(bool)

# tx_count_window
if 'tx_count_window' in df.columns:
    df['tx_count_window'] = pd.to_numeric(df['tx_count_window'], errors='coerce')

# risk_score
if 'risk_score' in df.columns:
    df['risk_score'] = pd.to_numeric(df['risk_score'], errors='coerce')

(df.dtypes)


## 4) Calidad de datos (missing, duplicados, rangos)

In [None]:
# Missingness
missing = (df.isna().mean().sort_values(ascending=False) * 100).to_frame('missing_%')
missing.head(20)


In [None]:
# Duplicados por event_id (si existe)
if 'event_id' in df.columns:
    dup = df['event_id'].duplicated().mean() * 100
    print(f"Duplicados event_id: {dup:.3f}%")
else:
    print('No existe event_id en el dataset cargado')


In [None]:
# Rangos básicos
cols_numeric = [c for c in ['amount','tx_count_window','risk_score'] if c in df.columns]
if cols_numeric:
    display(df[cols_numeric].describe(percentiles=[.01,.05,.5,.95,.99]).T)


## 5) Distribuciones principales

In [None]:
if 'amount' in df.columns:
    sns.histplot(df['amount'].dropna(), bins=50)
    plt.title('Distribución de amount')
    plt.show()

if 'tx_count_window' in df.columns:
    sns.histplot(df['tx_count_window'].dropna(), bins=50)
    plt.title('Distribución de tx_count_window')
    plt.show()


## 6) Análisis temporal (volumen + tasa de fraude)

In [None]:
if 'event_ts' in df.columns:
    df_time = df.dropna(subset=['event_ts']).copy()
    df_time['date'] = df_time['event_ts'].dt.floor('D')

    daily = df_time.groupby('date').agg(
        n=('event_ts','size'),
        fraud_rate=('is_fraud', 'mean') if 'is_fraud' in df_time.columns else ('event_ts','size')
    ).reset_index()

    ax = daily.plot(x='date', y='n', kind='line', title='Volumen diario')
    ax.set_xlabel('date'); ax.set_ylabel('n')
    plt.show()

    if 'is_fraud' in df_time.columns:
        ax = daily.plot(x='date', y='fraud_rate', kind='line', title='Tasa de fraude diaria')
        ax.set_xlabel('date'); ax.set_ylabel('fraud_rate')
        plt.show()


## 7) Cortes por categorías (top N)

In [None]:
def top_rate(df_in, group_col, target_col='is_fraud', min_count=50, top_n=20):
    if group_col not in df_in.columns or target_col not in df_in.columns:
        return None
    g = df_in.groupby(group_col).agg(
        n=(target_col, 'size'),
        fraud_rate=(target_col, 'mean'),
        amount_mean=('amount','mean') if 'amount' in df_in.columns else (target_col, 'size')
    ).reset_index()
    g = g[g['n'] >= min_count].sort_values('fraud_rate', ascending=False).head(top_n)
    return g

for col in ['merchant_id','payment_method','currency','device_id','ip_address','fraud_reason']:
    if col in df.columns and 'is_fraud' in df.columns:
        t = top_rate(df, col, min_count=100, top_n=15)
        if t is not None and len(t):
            display(t)


## 8) Modelo: baselines para predecir `is_fraud`

Este bloque:
- arma features numéricas + categóricas
- hace split temporal si existe `event_ts`, para evitar leakage
- entrena 3 modelos: `Dummy`, `LogisticRegression`, `RandomForest`
- reporta ROC-AUC y PR-AUC (más útil en clases desbalanceadas)


In [None]:
# Elegimos target
if 'is_fraud' not in df.columns:
    raise RuntimeError('La tabla no tiene columna is_fraud. No puedo entrenar un modelo supervisado.')

# Features candidatas (ajustá según tu tabla real)
feature_candidates = [
    'amount', 'tx_count_window', 'risk_score', 'hour', 'dow',
    'merchant_id', 'payment_method', 'currency', 'device_id', 'ip_address', 'status', 'user_id'
]
features = [c for c in feature_candidates if c in df.columns]
print('Features usadas:', features)

data = df.dropna(subset=['is_fraud']).copy()

# si no existen hour/dow, los calculamos desde event_ts
if 'event_ts' in data.columns:
    if 'hour' not in data.columns:
        data['hour'] = pd.to_datetime(data['event_ts'], utc=True, errors='coerce').dt.hour
    if 'dow' not in data.columns:
        data['dow'] = pd.to_datetime(data['event_ts'], utc=True, errors='coerce').dt.dayofweek

X = data[features]
y = data['is_fraud'].astype(int)

# Split temporal (recomendado) si hay timestamp
if 'event_ts' in data.columns:
    order = data['event_ts'].astype('int64')
    idx = np.argsort(order.values)
    cutoff = int(0.8 * len(idx))
    train_idx, test_idx = idx[:cutoff], idx[cutoff:]
    X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
    y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
    print('Split temporal: train=', len(train_idx), 'test=', len(test_idx))
else:
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    print('Split random: train=', len(X_train), 'test=', len(X_test))

num_cols = [c for c in X_train.columns if X_train[c].dtype != 'object']
cat_cols = [c for c in X_train.columns if X_train[c].dtype == 'object']
print('Num:', num_cols)
print('Cat:', cat_cols)

from sklearn.preprocessing import StandardScaler  # Importar StandardScaler
preprocess = ColumnTransformer(
    transformers=[
        ('num', Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='median')),
            ('scaler', StandardScaler()),  # Escalar las características numéricas
        ]), num_cols),
        ('cat', Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='most_frequent')),
            ('ohe', OneHotEncoder(handle_unknown='ignore')),
        ]), cat_cols),
    ],
    remainder='drop'
)

models = {
    'dummy_most_frequent': DummyClassifier(strategy='most_frequent'),
    'logreg': LogisticRegression(max_iter=1000, class_weight='balanced', n_jobs=None, solver='liblinear'),  # Aumentar max_iter y cambiar solver
    'rf': RandomForestClassifier(
        n_estimators=300,
        max_depth=None,
        min_samples_leaf=5,
        class_weight='balanced_subsample',
        random_state=42,
        n_jobs=-1,
    )
}

results = []
for name, clf in models.items():
    pipe = Pipeline(steps=[('prep', preprocess), ('clf', clf)])
    pipe.fit(X_train, y_train)

    # scores/probas
    if hasattr(pipe.named_steps['clf'], 'predict_proba'):
        proba = pipe.predict_proba(X_test)[:, 1]
    else:
        # fallback (dummy)
        proba = pipe.predict_proba(X_test)[:, 1]

    roc = roc_auc_score(y_test, proba)
    ap = average_precision_score(y_test, proba)
    results.append((name, roc, ap))

    print('===', name, '===')
    print('ROC-AUC:', roc)
    print('PR-AUC :', ap)

    RocCurveDisplay.from_predictions(y_test, proba)
    plt.title(f'ROC - {name}')
    plt.show()

    PrecisionRecallDisplay.from_predictions(y_test, proba)
    plt.title(f'PR - {name}')
    plt.show()

results_df = pd.DataFrame(results, columns=['model', 'roc_auc', 'pr_auc']).sort_values('pr_auc', ascending=False)
results_df


## 9) Insights rápidos (operativos)

- Top merchants por tasa de fraude (con mínimo de eventos)
- IPs/Devices con mayor tasa
- Relación entre `tx_count_window` y fraude


In [None]:
if 'is_fraud' in df.columns:
    def show_top(col, min_count=10, top_n=5):
        if col not in df.columns:
            return
        t = (
            df.groupby(col)
              .agg(n=('is_fraud', 'size'), fraud_rate=('is_fraud', 'mean'))
              .reset_index()
        )
        t = t[t['n'] >= min_count].sort_values('fraud_rate', ascending=False).head(top_n)
        if not t.empty:
            display(t)
        else:
            print(f"No hay suficientes datos para {col} con min_count={min_count}")

    print('Top user_id por tasa de fraude:')
    show_top('user_id', min_count=10, top_n=5)

    print('Top fraud_reason por tasa de fraude:')
    show_top('fraud_reason', min_count=10, top_n=5)

if 'tx_count_window' in df.columns and 'is_fraud' in df.columns:
    tmp = df[['tx_count_window', 'is_fraud']].dropna().copy()
    if not tmp.empty:
        tmp['tx_count_window'] = tmp['tx_count_window'].astype(int)
        g = tmp.groupby('tx_count_window').agg(n=('is_fraud', 'size'), fraud_rate=('is_fraud', 'mean')).reset_index()
        g = g[g['n'] >= 2]
        if not g.empty:
            sns.lineplot(data=g, x='tx_count_window', y='fraud_rate')
            plt.title('Fraud rate vs tx_count_window (bins exactos)')
            plt.show()
        else:
            print("No hay suficientes datos para generar la gráfica de fraud rate vs tx_count_window.")
    else:
        print("No hay suficientes datos en 'tx_count_window' o 'is_fraud'.")
