In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
# Carga de datos
df_cc = pd.read_csv('credit_card_transactions-ibm_v2.csv')
df_users = pd.read_csv('sd254_users.csv')
# df_cards = pd.read_csv('sd254_cards.csv')
# df_cc = pd.read_csv("User0_credit_card_transactions.csv")
df_users["User"] = range(0, len(df_users)) # Crear columna User en df_users para index

In [3]:
#Limpieza de datos de Transacciones
df_cc['Amount'] = df_cc['Amount'].str.replace('$', '').astype(float) #dolares a float
df_cc['Hour'] = df_cc['Time'].str.split(':').str[0].astype(int)   #hora:min a hora int
df_cc.drop(columns=['Time'], inplace=True)
df_cc['Is Fraud?'] = df_cc["Is Fraud?"].map({"No": False, "Yes": True})

chip_mapping = {
    "Swipe Transaction": 0,
    "Chip Transaction": 1,
    "Online Transaction": 2
}

df_cc["Use Chip"] =  df_cc['Use Chip'].replace(chip_mapping).astype(int) # chip como int
df_users['Per Capita Income - Zipcode'] = df_users['Per Capita Income - Zipcode'].str.replace('$', '').astype(int)
df_users['Yearly Income - Person'] = df_users['Yearly Income - Person'].str.replace('$', '').astype(int)
df_users['Total Debt'] = df_users['Total Debt'].str.replace('$', '').astype(float)

df_gigante = df_cc.merge(df_users, how="inner", on="User")   #merge de df_cc y df_users

  df_cc["Use Chip"] =  df_cc['Use Chip'].replace(chip_mapping).astype(int) # chip como int


In [4]:
# elimino datos redundantes/innecesatios
X = df_gigante.drop(columns=['Is Fraud?', 'User', 'Address', 'Latitude', 'Longitude', 'Errors?', 'Person', 'Apartment', 'City', 'State', 'Merchant City', 'Merchant State', 'Year', 'Day', 'Current Age', 'Gender', 'Birth Month'])
y = df_gigante['Is Fraud?']

# Primero separamos los datos de entrenamiento y test
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0, stratify=y)

In [5]:
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.metrics import classification_report, confusion_matrix

In [6]:
c = y.sum()/X_train.shape[0]

clf_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler()),
    ('classifier', IsolationForest(contamination=c, random_state=42))
])
# toma ~3 min
clf_pipeline.fit(X_train)
isofor_pred = clf_pipeline.predict(X_test)
isofor_pred[isofor_pred == 1] = 0
isofor_pred[isofor_pred == -1] = 1

# 0 es normal, -1 es fraude

In [7]:
print("IsolationForest:")
print(classification_report(y_test, isofor_pred))
print(confusion_matrix(y_test, isofor_pred))

IsolationForest:
              precision    recall  f1-score   support

       False       1.00      1.00      1.00   7307143
        True       0.00      0.00      0.00      8927

    accuracy                           1.00   7316070
   macro avg       0.50      0.50      0.50   7316070
weighted avg       1.00      1.00      1.00   7316070

[[7294412   12731]
 [   8898      29]]


In [8]:
clf_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler()),
    ('classifier', OneClassSVM(kernel='rbf', nu=0.01))
])

clf_pipeline.fit(X_train)
svm_pred = clf_pipeline.predict(X_test)
# 0 es normal, -1 es fraude
svm_pred[svm_pred == 1] = 0
svm_pred[svm_pred == -1] = 1

In [9]:
print("OneClassSVM:")
print(classification_report(y_test, svm_pred))
print(confusion_matrix(y_test, svm_pred))

OneClassSVM:
              precision    recall  f1-score   support

       False       1.00      0.98      0.99      5981
        True       0.01      0.25      0.03         8

    accuracy                           0.98      5989
   macro avg       0.51      0.61      0.51      5989
weighted avg       1.00      0.98      0.99      5989

[[5844  137]
 [   6    2]]


In [7]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim

X_train_tensor = torch.tensor(X_train.to_numpy(), dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.to_numpy(), dtype=torch.uint8)
X_test_tensor = torch.tensor(X_test.to_numpy(), dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.to_numpy(), dtype=torch.uint8)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

In [12]:
class MLP(nn.Module):
    def __init__(self, input_size=X_train.shape[1], hidden_size=round(X_train.shape[1]/2), num_classes=2):
        super(MLP, self).__init__()
        # input se configura solo
        # capa oculta del input de 28*28 a 64 neuronas
        self.capa_oculta = nn.Linear(input_size, hidden_size)
        
        # capa de output de 64 a 10
        self.capa_output = nn.Linear(hidden_size, num_classes)
        
        # funcion de activación
        self.relu = nn.ReLU()
    
    def forward(self, x):
        # Pasamos x por la capa oculta
        salida_capa_oculta = self.capa_oculta(x)
        
        # Aplicamos la función de activación a la salida de la capa oculta
        salida_fun_activacion_capa_oculta = self.relu(salida_capa_oculta)
        
        # Pasamos x por la capa de salida
        output = self.capa_output(salida_fun_activacion_capa_oculta)
        
        return output

In [13]:
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs, device='cpu'):
    """
    Función general para entrenar modelos en PyTorch, con seguimiento de pérdidas y métricas.
    
    Args:
        model: Modelo de PyTorch a entrenar.
        train_loader: DataLoader para los datos de entrenamiento.
        val_loader: DataLoader para los datos de validación.
        criterion: Función de pérdida.
        optimizer: Optimizador para el modelo.
        epochs: Número de épocas a entrenar.
        device: CPU o GPU.
        
    Returns:
        metrics: Diccionario con las pérdidas y métricas de validación.
    """
    model.to(device)
    train_losses = []
    val_losses = []
    val_accuracies = []
    
    for epoch in range(epochs):
        # Modo de entrenamiento
        model.train()
        total_train_loss = 0
        
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()
        
        # Promediar la pérdida de entrenamiento
        avg_train_loss = total_train_loss / len(train_loader)
        train_losses.append(avg_train_loss)
        
        # Modo de evaluación
        model.eval()
        total_val_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            for X_val_batch, y_val_batch in val_loader:
                X_val_batch, y_val_batch = X_val_batch.to(device), y_val_batch.to(device)
                val_outputs = model(X_val_batch)
                val_loss = criterion(val_outputs, y_val_batch)
                total_val_loss += val_loss.item()
                
                _, predicted = torch.max(val_outputs, 1)
                correct += (predicted == y_val_batch).sum().item()
                total += y_val_batch.size(0)
        
        avg_val_loss = total_val_loss / len(val_loader)
        val_losses.append(avg_val_loss)
        
        val_accuracy = correct / total
        val_accuracies.append(val_accuracy)
        
        print(f"Epoch {epoch+1}/{epochs}, Training Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")
    
    # Graficar las pérdidas
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, epochs+1), train_losses, label='Training Loss')
    plt.plot(range(1, epochs+1), val_losses, label='Validation Loss', linestyle='--')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Loss Evolution During Training')
    plt.legend()
    plt.grid()
    plt.show()
    
    return {'train_loss': train_losses, 'val_loss': val_losses, 'val_accuracy': val_accuracies}

In [16]:
mlp_torch = MLP()

optimizer = optim.Adam(mlp_torch.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# entrenamos el modelo con la funcion dada
metrics_mlp = train_model(
    model=mlp_torch,
    train_loader=train_loader,
    val_loader=test_loader, #no es ideal, pero separar mas los datos deja poquitos fraudes
    criterion=nn.CrossEntropyLoss(),
    optimizer=optim.Adam(mlp_torch.parameters(), lr=0.001),
    epochs=5
)

Epoch 1/5, Training Loss: nan, Validation Loss: nan, Validation Accuracy: 0.9988


KeyboardInterrupt: 

In [None]:
# Evaluar el modelo en el conjunto de prueba
def evaluate_model(model, data_loader, criterion, device='cpu'):
    model.eval()
    model.to(device)
    data_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            data_loss += loss.item()
            
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y_batch).sum().item()
            total += y_batch.size(0)
    
    accuracy = correct / total
    print(f"Data Accuracy: {accuracy:.4f}")
    
    avg_loss = data_loss / len(data_loader)
    print(f"Data Loss: {avg_loss:.4f}")
    
    return {'loss': avg_loss, 'accuracy': accuracy}

In [None]:
criterion = nn.CrossEntropyLoss()

print("Red Neuronal")
_ = evaluate_model(mlp_torch, test_loader, criterion)