In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
import jax.numpy as jnp
from torch.utils.data import DataLoader, TensorDataset

data = pd.read_csv('creditcard.csv')

dataset = data.to_numpy().astype(np.float32)  # Ensure data is float32 for PyTorch

scaler = RobustScaler(with_centering=True, with_scaling=True, quantile_range=(25.0, 75.0), unit_variance=False)
dataset[:, [0, 29]] = scaler.fit_transform(dataset[:, [0, 29]])

In [2]:
data_train, data_test = train_test_split(dataset, test_size=0.3, random_state=42)

In [3]:
# -----------------------------
# 3. Define the Autoencoder Model
# -----------------------------
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        # Encoder: 31 -> 23 -> 19 -> 17 -> 8 with dropout after first two hidden layers
        self.encoder = nn.Sequential(
            nn.Linear(31, 23),
            nn.Tanh(),
            nn.Dropout(0.1),
            nn.Linear(23, 19),
            nn.Tanh(),
            nn.Dropout(0.2),
            nn.Linear(19, 17),
            nn.Tanh(),
            nn.Linear(17, 8)
        )
        # Decoder: 8 -> 17 -> 19 -> 23 -> 31 with dropout after first two layers
        self.decoder = nn.Sequential(
            nn.Linear(8, 17),
            nn.Tanh(),
            nn.Dropout(0.2),
            nn.Linear(17, 19),
            nn.Tanh(),
            nn.Dropout(0.1),
            nn.Linear(19, 23),
            nn.Tanh(),
            nn.Linear(23, 31)
        )

    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed

In [4]:
fraud_data = data_train[data_train[:, -1] == 1]
non_fraud_data = data_train[data_train[:, -1] == 0]

legit_data_limited = non_fraud_data[np.random.choice(non_fraud_data.shape[0], fraud_data.shape[0], replace=False)]

train_dataset_SVM = np.vstack((legit_data_limited, fraud_data))
np.random.shuffle(train_dataset_SVM)
train_labels_SVM = train_dataset_SVM[:, -1]
train_dataset_SVM = train_dataset_SVM[:, :-1]

data_train_SVM, data_test_SVM, labels_train_SVM, labels_test_SVM = train_test_split(train_dataset_SVM, train_labels_SVM, test_size=0.2, random_state=42)
model = SVC(kernel='linear', C=1.0)
model.fit(data_train_SVM, labels_train_SVM)

labels_pred_test = model.predict(data_test_SVM)
labels_pred_train = model.predict(data_train_SVM)
accuracy_train = jnp.mean(labels_pred_train == labels_train_SVM)
accuracy_test = jnp.mean(labels_pred_test == labels_test_SVM)

print(f"Train Accuracy: {accuracy_train:.4f}")
print(f"Test Accuracy: {accuracy_test:.4f}")

Train Accuracy: 0.9473
Test Accuracy: 0.9301


In [5]:
# load the model 
myModel = Autoencoder()

# Load the saved state dictionary
myModel.load_state_dict(torch.load("model/autoencoder/autoencoder.pth", map_location=torch.device('cpu'), weights_only=True))
myModel.eval()

alpha = 0.9

data_train_oversampled = data_train.copy()

while (len(data_train_oversampled[data_train_oversampled[:, -1] == 1]) < len(data_train_oversampled[data_train_oversampled[:, -1] == 0])):
    fraud_data = data_train_oversampled[data_train_oversampled[:, -1] == 1]

    # 2) Generate synthetic data by interpolating between fraud data points
    fraud_data_tensor = torch.tensor(fraud_data, dtype=torch.float32)

    latent_representations = myModel.encoder(fraud_data_tensor)

    noisy_vectors = []
    for _ in range(300):
        # Pick a random fraud vector
        i = np.random.randint(0, 300)
        z_i = latent_representations[i]

        # Optionally pick a second random vector to interpolate
        j = np.random.randint(0, 300)
        z_j = latent_representations[j]
        
        # Interpolation factor
        lambda_ = np.random.rand()
        z_ij = lambda_*z_i + (1 - lambda_)*z_j
        
        # Add Gaussian noise around that interpolation
        noise = torch.normal(mean=0.0, std=alpha, size=(8,))
        z_syn = z_ij + noise  # Both are tensors, so this should work directly

        noisy_vectors.append(z_syn.detach().numpy())

    noisy_vectors = np.array(noisy_vectors)

    # 3) Decode them to get synthetic data in original input space
    X_synthetic = myModel.decoder(torch.tensor(noisy_vectors, dtype=torch.float32)).detach().numpy()
    # remove label
    X_synthetic_to_predict = X_synthetic[:, :-1]
    # 4) Validate with SVM
    y_pred = model.predict(X_synthetic_to_predict)

    X_synthetic[:, -1] = y_pred

    # 5) Add the synthetic data to the training set
    X_synthetic_fraud = X_synthetic[X_synthetic[:, -1] == 1]

    data_train_oversampled = np.vstack((data_train_oversampled, X_synthetic_fraud))

    print("New synthetic data generated", len(X_synthetic_fraud))
    print(f"Number of fraud in the balanced dataset {len(data_train_oversampled[data_train_oversampled[:, -1] == 1])}/{len(data_train_oversampled[data_train_oversampled[:, -1] == 0])}")

# for i in range(len(data_train_oversampled[data_train_oversampled[:, -1] == 1]) - len(data_train_oversampled[data_train_oversampled[:, -1] == 0])):
#     noisy_vectors = []
#     for _ in range(n):
#         # Pick a random fraud vector
#         i = np.random.randint(0, n)
#         z_i = latent_representations[i]

#         # Optionally pick a second random vector to interpolate
#         j = np.random.randint(0, n)
#         z_j = latent_representations[j]
        
#         # Interpolation factor
#         lambda_ = np.random.rand()
#         z_ij = lambda_*z_i + (1 - lambda_)*z_j
        
#         # Add Gaussian noise around that interpolation
#         noise = torch.normal(mean=0.0, std=alpha, size=(latent_dim,))
#         z_syn = z_ij + noise  # Both are tensors, so this should work directly

#         noisy_vectors.append(z_syn.detach().numpy())

#     noisy_vectors = np.array(noisy_vectors)

#     # 3) Decode them to get synthetic data in original input space
#     X_synthetic = myModel.decoder(torch.tensor(noisy_vectors, dtype=torch.float32)).detach().numpy()
#     # remove label
#     X_synthetic_to_predict = X_synthetic[:, :-1]
#     # 4) Validate with SVM
#     y_pred = model.predict(X_synthetic_to_predict)

#     X_synthetic[:, -1] = y_pred

#     # 5) Add the synthetic data to the training set
#     X_synthetic_fraud = X_synthetic[X_synthetic[:, -1] == 1]

#     data_train_oversampled = np.vstack((data_train_oversampled, X_synthetic_fraud))

print(data_train_oversampled.shape)
print("Number of fraud in the balanced dataset", len(data_train_oversampled[data_train_oversampled[:, -1] == 1]))
print("Number of non-fraud in the balanced dataset", len(data_train_oversampled[data_train_oversampled[:, -1] == 0]))

New synthetic data generated 292
Number of fraud in the balanced dataset 648/199008
New synthetic data generated 297
Number of fraud in the balanced dataset 945/199008
New synthetic data generated 297
Number of fraud in the balanced dataset 1242/199008
New synthetic data generated 298
Number of fraud in the balanced dataset 1540/199008
New synthetic data generated 294
Number of fraud in the balanced dataset 1834/199008
New synthetic data generated 296
Number of fraud in the balanced dataset 2130/199008
New synthetic data generated 296
Number of fraud in the balanced dataset 2426/199008
New synthetic data generated 297
Number of fraud in the balanced dataset 2723/199008
New synthetic data generated 295
Number of fraud in the balanced dataset 3018/199008
New synthetic data generated 292
Number of fraud in the balanced dataset 3310/199008
New synthetic data generated 295
Number of fraud in the balanced dataset 3605/199008
New synthetic data generated 292
Number of fraud in the balanced da

In [6]:
class Attention(nn.Module):
    def __init__(self, input_dim):
        super(Attention, self).__init__()
        self.attention_weights = nn.Linear(input_dim, 1, bias=False)  # Learnable attention weights

    def forward(self, lstm_output):
        # Compute attention scores (softmax over time dimension)
        attention_scores = torch.softmax(self.attention_weights(lstm_output), dim=1)  
        weighted_output = lstm_output * attention_scores  # Apply attention
        return torch.sum(weighted_output, dim=1)  # Sum over sequence length (time steps)

In [7]:
class ALSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(ALSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)  # Forward LSTM
        self.attention = Attention(hidden_dim)  # Attention Layer
        self.fc = nn.Linear(hidden_dim, 1)  # Fully connected output layer
        self.sigmoid = nn.Sigmoid()  # Sigmoid activation for binary classification

    def forward(self, x):
        lstm_output, _ = self.lstm(x)  # Get LSTM output
        attention_out = self.attention(lstm_output)  # Apply attention
        output = self.fc(attention_out)  # Fully connected layer
        return self.sigmoid(output)  # Sigmoid activation for probability output

In [9]:
# split data_train_oversampled in 20-80 split test-train
data_oversampled_labels = data_train_oversampled[:, -1]
data_train_oversampled = data_train_oversampled[:, :-1]
data_reshaped = data_train_oversampled.reshape(data_train_oversampled.shape[0], 1, data_train_oversampled.shape[1])

# Convert to PyTorch tensors
X_tensor = torch.tensor(data_reshaped, dtype=torch.float32)
y_tensor = torch.tensor(data_oversampled_labels, dtype=torch.float32)

# Split into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_tensor, y_tensor, test_size=0.2, random_state=42)

# Create DataLoader for PyTorch
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [10]:
input_dim = data_reshaped.shape[2]  # Number of features per transaction
hidden_dim = 128 # da cercare il valore migliore

# Initialize model, loss function, and optimizer
ALSTMmodel = ALSTM(input_dim, hidden_dim)
criterion = nn.BCELoss()  # Binary Cross Entropy for classification
optimizer = torch.optim.Adam(ALSTMmodel.parameters(), lr=0.001)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ALSTMmodel.to(device)

ALSTM(
  (lstm): LSTM(29, 128, batch_first=True)
  (attention): Attention(
    (attention_weights): Linear(in_features=128, out_features=1, bias=False)
  )
  (fc): Linear(in_features=128, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)

In [15]:
def train_ALSTM(model, train_loader, residual):
    num_epochs = 10
    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0

        for X_batch, _ in train_loader:
            # transform residual in tensor
            residual = torch.tensor(residual, dtype=torch.float32)
            X_batch, residual = X_batch.to(device), residual.to(device)

            optimizer.zero_grad()  # Reset gradients
            y_pred = model(X_batch).squeeze()
        
            loss = criterion(y_pred, residual)  # Compute loss
            loss.backward()  # Backpropagation
            optimizer.step()  # Update weights

            epoch_loss += loss.item()
    
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}")
        
    return model

In [16]:
def loss(y, y_hat):
    return 1/2 * np.sum((y - y_hat)**2)

In [17]:
h_x = loss(data_oversampled_labels, 0)

for _ in range(6):

    residual = (data_oversampled_labels - h_x)
    ALSTMmodel = train_ALSTM(ALSTMmodel, train_loader, residual)
    y_pred = ALSTMmodel(train_loader).squeeze().detach().numpy()
    step_length = np.argmin(loss(data_oversampled_labels, h_x + step_length * y_pred))
    h_x += step_length * y_pred
    

ValueError: Using a target size (torch.Size([398196])) that is different to the input size (torch.Size([32])) is deprecated. Please ensure they have the same size.

In [None]:
ALSTMmodel.eval()  # Set model to evaluation mode

correct = 0
total = 0

with torch.no_grad():  # No gradients needed for evaluation
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        y_pred = ALSTMmodel(X_batch).squeeze()
        y_pred_labels = (y_pred > 0.5).float()  # Convert probabilities to binary labels
        
        correct += (y_pred_labels == y_batch).sum().item()
        total += y_batch.size(0)

accuracy = correct / total
print(f"Test Accuracy: {accuracy:.4f}")