# Import Libraries

In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, f1_score, precision_score, recall_score
import torch
import torch.nn as nn
import torch.optim as optim

from sdv.single_table import CTGANSynthesizer
from sdv.metadata import SingleTableMetadata

import warnings 
warnings.filterwarnings('ignore') 

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# ---------------------------
# 1. Load and Shuffle the Dataset
# ---------------------------
df_full = pd.read_parquet('../BIS_data/Final_BIS_Data.parquet')
df_full = df_full.reset_index(drop=True)
df_full.info()

Using device: cuda
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2138275 entries, 0 to 2138274
Data columns (total 6 columns):
 #   Column                  Dtype  
---  ------                  -----  
 0   pca_0                   float64
 1   pca_1                   float64
 2   pca_2                   float64
 3   pca_3                   float64
 4   pca_4                   float64
 5   laundering_schema_type  int64  
dtypes: float64(5), int64(1)
memory usage: 97.9 MB


# 2. Split Data

In [3]:
# ---------------------------
# 2. Split Data into Train and Test Sets BEFORE Undersampling
# ---------------------------
X = df_full.drop('laundering_schema_type', axis=1)
y = df_full['laundering_schema_type']

# Use stratify=y to preserve the class distribution in the test set.
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

fraud_train = X_train[y_train == 1]
nonfraud_train = X_train[y_train == 0]

n_nonfraud = len(nonfraud_train)
n_fraud = len(fraud_train)
n_to_generate = n_nonfraud - n_fraud

print(f"Generating {n_to_generate} synthetic fraud samples using CTGAN.")

Generating 1489380 synthetic fraud samples using CTGAN.


# 3. CTGAN

## 3.1 Default CTGAN

In [4]:
# ---------------------------
# 3. CTGAN Only the Training Set
# ---------------------------
metadata = SingleTableMetadata()
metadata.detect_from_dataframe(fraud_train)
synthesizer = CTGANSynthesizer(metadata, cuda=True, verbose=True)
synthesizer.fit(fraud_train)

synthetic_data = synthesizer.sample(num_rows=n_to_generate)
synthetic_data.head()

Gen. (-2.21) | Discrim. (0.00): 100%|████████████████████████████████████████████████| 300/300 [14:36<00:00,  2.92s/it]


Unnamed: 0,pca_0,pca_1,pca_2,pca_3,pca_4
0,1.648796,1.312458,-0.331564,-0.862923,0.907045
1,2.041407,-0.874377,0.080091,0.436251,0.752192
2,0.933969,-0.00552,-0.539325,-0.454675,0.507787
3,1.173809,1.779171,-0.485998,-0.406481,1.234283
4,1.034956,0.105055,1.111455,-0.647845,1.430754


In [5]:
from sdv.evaluation.single_table import run_diagnostic

diagnostic = run_diagnostic(
    real_data=fraud_train,
    synthetic_data=synthetic_data,
    metadata=metadata
)

from sdv.evaluation.single_table import evaluate_quality

quality_report = evaluate_quality(
    fraud_train,
    synthetic_data,
    metadata
)

Generating report ...

(1/2) Evaluating Data Validity: |██████████████████████████████████████████████████████| 5/5 [00:00<00:00, 104.23it/s]|
Data Validity Score: 100.0%

(2/2) Evaluating Data Structure: |█████████████████████████████████████████████████████| 1/1 [00:00<00:00, 501.23it/s]|
Data Structure Score: 100.0%

Overall Score (Average): 100.0%

Generating report ...

(1/2) Evaluating Column Shapes: |███████████████████████████████████████████████████████| 5/5 [00:00<00:00,  6.52it/s]|
Column Shapes Score: 92.93%

(2/2) Evaluating Column Pair Trends: |████████████████████████████████████████████████| 10/10 [00:02<00:00,  4.74it/s]|
Column Pair Trends Score: 95.51%

Overall Score (Average): 94.22%



In [6]:
X_train_ctgan = pd.concat([nonfraud_train, fraud_train, synthetic_data], axis=0)
y_train_ctgan = np.concatenate([np.zeros(n_nonfraud),np.ones(n_fraud), np.ones(n_to_generate)])

Xy_train_ctgan = X_train_ctgan.copy()
Xy_train_ctgan['laundering_schema_type'] = y_train_ctgan

In [7]:
Xy_train_ctgan = Xy_train_ctgan.sample(frac=1, random_state=42)

In [8]:
# Xy_train_ctgan.to_csv("BIS_CTGAN_Full.csv",index=False)

In [5]:
# Xy_train_ctgan = pd.read_csv("BIS_CTGAN_Full.csv")

In [9]:
# Separate features and labels for the undersampled training data.
X_train = Xy_train_ctgan.drop('laundering_schema_type', axis=1).values
y_train = Xy_train_ctgan['laundering_schema_type'].values

# For the test set, keep the original (imbalanced) distribution.
X_test = X_test.values
y_test = y_test.values

# Convert the datasets to PyTorch tensors.
X_train = torch.tensor(X_train, dtype=torch.float).to(device)
y_train = torch.tensor(y_train, dtype=torch.long).to(device)
X_test  = torch.tensor(X_test, dtype=torch.float).to(device)
y_test  = torch.tensor(y_test, dtype=torch.long).to(device)

## 3.2 Finetune CTGAN

In [None]:
metadata = SingleTableMetadata()
metadata.detect_from_dataframe(fraud_train)

custom_synthesizer = CTGANSynthesizer(
    metadata,
    epochs=5000,
    batch_size=100,
    verbose=True
)

custom_synthesizer.fit(fraud_train)

In [None]:
synthetic_data_customized = custom_synthesizer.sample(num_rows=n_to_generate)
synthetic_data_customized.head()

In [None]:
from sdv.evaluation.single_table import evaluate_quality

quality_report_custom = evaluate_quality(
    fraud_train,
    synthetic_data_customized,
    metadata
)

In [None]:
X_train_ctgan = pd.concat([nonfraud_train, fraud_train, synthetic_data_customized], axis=0)
y_train_ctgan = np.concatenate([np.zeros(n_nonfraud),np.ones(n_fraud), np.ones(n_to_generate)])
Xy_train_ctgan = X_train_ctgan.copy()
Xy_train_ctgan['Class'] = y_train_ctgan
Xy_train_ctgan = Xy_train_ctgan.sample(frac=1, random_state=42)

In [None]:
# Separate features and labels for the undersampled training data.
X_train = Xy_train_ctgan.drop('Class', axis=1).values
y_train = Xy_train_ctgan['Class'].values

# For the test set, keep the original (imbalanced) distribution.
X_test = X_test.values
y_test = y_test.values

# Convert the datasets to PyTorch tensors.
X_train = torch.tensor(X_train, dtype=torch.float).to(device)
y_train = torch.tensor(y_train, dtype=torch.long).to(device)
X_test  = torch.tensor(X_test, dtype=torch.float).to(device)
y_test  = torch.tensor(y_test, dtype=torch.long).to(device)

# 4. Define Model

In [7]:
# ---------------------------
# 4. Define the MLP Model
# ---------------------------
class ResidualBlock(nn.Module):
    def __init__(self, hidden_dim):
        super(ResidualBlock, self).__init__()
        self.block = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
        )
        self.relu = nn.ReLU()
        
    def forward(self, x):
        residual = x
        out = self.block(x)
        out += residual  # Skip connection
        out = self.relu(out)
        return out

class FraudResNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_blocks=3):
        super(FraudResNet, self).__init__()
        # Initial projection layer
        self.input_layer = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU()
        )
        
        # Residual blocks
        self.res_blocks = nn.Sequential(
            *[ResidualBlock(hidden_dim) for _ in range(num_blocks)]
        )
        
        # Final classification layer
        self.output_layer = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        x = self.input_layer(x)
        x = self.res_blocks(x)
        x = self.output_layer(x)
        return x

input_dim = X_train.shape[1]
hidden_dim = 32
output_dim = 2  # Two neurons for two classes
num_blocks = 3  # Number of residual blocks

# model = FraudMLP(input_dim, hidden_dim, output_dim)
model = FraudResNet(input_dim, hidden_dim, output_dim, num_blocks).to(device)

### Default

In [8]:
# ---------------------------
# 5. Setup Loss, Optimizer, and Training Parameters
# ---------------------------
# Since the training data is now balanced, we can use the default weights.
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 200
best_loss = float('inf')
best_model_path = "./Model_Weight/ResNet_best_CTGAN_BIS.pt"

In [9]:
# ---------------------------
# 6. Training Loop with Checkpointing (Saving Best Model by Training Loss)
# ---------------------------
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()
    
    outputs = model(X_train)
    loss = criterion(outputs, y_train)
    loss.backward()
    optimizer.step()
    
    # Save the model if training loss improved.
    if loss.item() < best_loss:
        best_loss = loss.item()
        torch.save(model.state_dict(), best_model_path)
    
    if (epoch + 1) % 5 == 0:
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")

Epoch [5/200], Loss: 0.6139
Epoch [10/200], Loss: 0.5124
Epoch [15/200], Loss: 0.4690
Epoch [20/200], Loss: 0.4386
Epoch [25/200], Loss: 0.4183
Epoch [30/200], Loss: 0.4047
Epoch [35/200], Loss: 0.3950
Epoch [40/200], Loss: 0.3878
Epoch [45/200], Loss: 0.3819
Epoch [50/200], Loss: 0.3772
Epoch [55/200], Loss: 0.3732
Epoch [60/200], Loss: 0.3698
Epoch [65/200], Loss: 0.3669
Epoch [70/200], Loss: 0.3644
Epoch [75/200], Loss: 0.3622
Epoch [80/200], Loss: 0.3602
Epoch [85/200], Loss: 0.3584
Epoch [90/200], Loss: 0.3568
Epoch [95/200], Loss: 0.3554
Epoch [100/200], Loss: 0.3540
Epoch [105/200], Loss: 0.3528
Epoch [110/200], Loss: 0.3517
Epoch [115/200], Loss: 0.3506
Epoch [120/200], Loss: 0.3497
Epoch [125/200], Loss: 0.3487
Epoch [130/200], Loss: 0.3479
Epoch [135/200], Loss: 0.3471
Epoch [140/200], Loss: 0.3464
Epoch [145/200], Loss: 0.3457
Epoch [150/200], Loss: 0.3450
Epoch [155/200], Loss: 0.3444
Epoch [160/200], Loss: 0.3438
Epoch [165/200], Loss: 0.3433
Epoch [170/200], Loss: 0.3427


In [10]:
# ---------------------------
# 7. Evaluation on the Test Set (with the Original Imbalanced Distribution)
# ---------------------------
model.eval()
with torch.no_grad():
    test_outputs = model(X_test)
    _, predicted = torch.max(test_outputs, 1)
    
    y_pred = predicted.cpu().numpy()
    y_true = y_test.cpu().numpy()
    
    print("\nEvaluation on SetA (Test Data):")
    print(classification_report(y_true, y_pred))
    print(confusion_matrix(y_true, y_pred))
    print("F1-score:", f1_score(y_true, y_pred))
    print("Precision-score:", precision_score(y_true, y_pred))
    print("Recall-score:", recall_score(y_true, y_pred))


Evaluation on SetA (Test Data):
              precision    recall  f1-score   support

           0       0.98      0.87      0.92    400000
           1       0.29      0.79      0.43     27655

    accuracy                           0.86    427655
   macro avg       0.64      0.83      0.67    427655
weighted avg       0.94      0.86      0.89    427655

[[346588  53412]
 [  5695  21960]]
F1-score: 0.4262960194900366
Precision-score: 0.2913548797962108
Recall-score: 0.7940697884650154


### 4.1 Fine Tune CTGAN

In [None]:
# ---------------------------
# 5. Setup Loss, Optimizer, and Training Parameters
# ---------------------------
# Since the training data is now balanced, we can use the default weights.
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 200
best_loss = float('inf')
best_model_path = "./Model_Weight/ResNet_best_CTGAN_FT_BIS.pt"

In [None]:
# ---------------------------
# 6. Training Loop with Checkpointing (Saving Best Model by Training Loss)
# ---------------------------
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()
    
    outputs = model(X_train)
    loss = criterion(outputs, y_train)
    loss.backward()
    optimizer.step()
    
    # Save the model if training loss improved.
    if loss.item() < best_loss:
        best_loss = loss.item()
        torch.save(model.state_dict(), best_model_path)
    
    if (epoch + 1) % 5 == 0:
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")

In [None]:
# ---------------------------
# 7. Evaluation on the Test Set (with the Original Imbalanced Distribution)
# ---------------------------
model.eval()
with torch.no_grad():
    test_outputs = model(X_test)
    _, predicted = torch.max(test_outputs, 1)
    
    y_pred = predicted.cpu().numpy()
    y_true = y_test.cpu().numpy()
    
    print("\nEvaluation on SetA (Test Data):")
    print(classification_report(y_true, y_pred))
    print(confusion_matrix(y_true, y_pred))
    print("F1-score:", f1_score(y_true, y_pred))
    print("Precision-score:", precision_score(y_true, y_pred))
    print("Recall-score:", recall_score(y_true, y_pred))

# 5. Evaluate on Unseen Data (Without Fine-tuning)

### Default

In [None]:
# Without Fine-tuning
# ---------------------------
# 8. Load the Best Model into a New Instance and Test on Unseen Data (e.g., SetB)
# ---------------------------
# Create a new model instance with the same architecture.
print(best_model_path)
loaded_model = FraudResNet(30, 128, 2, 5)
loaded_model.load_state_dict(torch.load(best_model_path))
loaded_model.eval()

# Assume you have an unseen dataset (SetB) with the same attributes.
new_df = pd.read_csv("Fraud_dataset/Creditcard/GlobalUnseen.csv")

if 'Class' in new_df.columns:
    X_new = new_df.drop('Class', axis=1).values
    y_new = new_df['Class'].values
else:
    X_new = new_df.values
    y_new = None

X_new = torch.tensor(X_new, dtype=torch.float)
if y_new is not None:
    y_new = torch.tensor(y_new, dtype=torch.long)

with torch.no_grad():
    new_outputs = loaded_model(X_new)
    _, new_predicted = torch.max(new_outputs, 1)
    new_predictions = new_predicted.numpy()

if y_new is not None:
    # Convert y_new tensor to numpy array for metric calculations.
    y_new_np = y_new.numpy()
    print("\nEvaluation on Unseen Data (SetB):")
    print(classification_report(y_new_np, new_predictions))
    print(confusion_matrix(y_new_np, new_predictions))
    print("F1-score:", f1_score(y_new_np, new_predictions))
    print("Precision-score:", precision_score(y_new_np, new_predictions))
    print("Recall-score:", recall_score(y_new_np, new_predictions))
else:
    print("\nPredictions on Unseen Data (SetB):")
    print(new_predictions)

### 5.1 CTGAN FT

In [None]:
# Without Fine-tuning
# ---------------------------
# 8. Load the Best Model into a New Instance and Test on Unseen Data (e.g., SetB)
# ---------------------------
# Create a new model instance with the same architecture.
print(best_model_path)
loaded_model = FraudResNet(30, 128, 2, 5)
loaded_model.load_state_dict(torch.load(best_model_path))
loaded_model.eval()

# Assume you have an unseen dataset (SetB) with the same attributes.
new_df = pd.read_csv("Fraud_dataset/Creditcard/GlobalUnseen.csv")

if 'Class' in new_df.columns:
    X_new = new_df.drop('Class', axis=1).values
    y_new = new_df['Class'].values
else:
    X_new = new_df.values
    y_new = None

X_new = torch.tensor(X_new, dtype=torch.float)
if y_new is not None:
    y_new = torch.tensor(y_new, dtype=torch.long)

with torch.no_grad():
    new_outputs = loaded_model(X_new)
    _, new_predicted = torch.max(new_outputs, 1)
    new_predictions = new_predicted.numpy()

if y_new is not None:
    # Convert y_new tensor to numpy array for metric calculations.
    y_new_np = y_new.numpy()
    print("\nEvaluation on Unseen Data (SetB):")
    print(classification_report(y_new_np, new_predictions))
    print(confusion_matrix(y_new_np, new_predictions))
    print("F1-score:", f1_score(y_new_np, new_predictions))
    print("Precision-score:", precision_score(y_new_np, new_predictions))
    print("Recall-score:", recall_score(y_new_np, new_predictions))
else:
    print("\nPredictions on Unseen Data (SetB):")
    print(new_predictions)

# 6. Evaluate on Unseen Data (With Fine-tuning)

In [None]:
new_df = pd.read_csv("Fraud_dataset/Creditcard/GlobalUnseen.csv")
best_model_path = "./Model_Final/best_model_ctgan_final_all.pt"

# Separate features and labels.
X_new = new_df.drop('Class', axis=1).values
y_new = new_df['Class'].values

# Split the unseen data into a fine-tuning training set and a test set.
X_new_train, X_new_test, y_new_train, y_new_test = train_test_split(
    X_new, y_new, test_size=0.5, random_state=42, stratify=y_new
)

# Convert to PyTorch tensors.
X_new_train = torch.tensor(X_new_train, dtype=torch.float)
y_new_train = torch.tensor(y_new_train, dtype=torch.long)
X_new_test  = torch.tensor(X_new_test, dtype=torch.float)
y_new_test  = torch.tensor(y_new_test, dtype=torch.long)

# ---------------------------
# 2. Load the Pre-Trained Model from SetA
# ---------------------------
# Create a new model instance with the same architecture.
class FraudMLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(FraudMLP, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.BatchNorm1d(hidden_dim),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.BatchNorm1d(hidden_dim),
            nn.Linear(hidden_dim, output_dim)
        )
        
    def forward(self, x):
        return self.model(x)

input_dim = X_new_train.shape[1]
hidden_dim = 64
output_dim = 2  # Two neurons for two classes

model = FraudMLP(input_dim, hidden_dim, output_dim)

loaded_model = FraudMLP(input_dim, hidden_dim, output_dim)
loaded_model.load_state_dict(torch.load(best_model_path))

# It's often useful to set the model in train mode during fine-tuning.
loaded_model.train()

# ---------------------------
# 3. Fine-Tuning on Unseen Data (SetB)
# ---------------------------
# Use a lower learning rate for fine-tuning.
finetune_optimizer = optim.Adam(loaded_model.parameters(), lr=1e-3)
finetune_criterion = nn.CrossEntropyLoss()

finetune_epochs = 300  # Adjust as needed

print("\n--- Fine-Tuning on Unseen Data (SetB) ---")
for epoch in range(finetune_epochs):
    finetune_optimizer.zero_grad()
    outputs = loaded_model(X_new_train)
    loss = finetune_criterion(outputs, y_new_train)
    loss.backward()
    finetune_optimizer.step()
    
    if (epoch + 1) % 5 == 0:
        print(f"Fine-Tuning Epoch [{epoch + 1}/{finetune_epochs}], Loss: {loss.item():.4f}")


In [None]:
# ---------------------------
# 4. Evaluate the Fine-Tuned Model on SetB Test Data
# ---------------------------
loaded_model.eval()
with torch.no_grad():
    test_outputs = loaded_model(X_new_test)
    _, predicted = torch.max(test_outputs, 1)
    y_pred = predicted.numpy()
    y_true = y_new_test.numpy()
    
    print("\nEvaluation on Fine-Tuned Unseen Data (SetB Test):")
    print(classification_report(y_true, y_pred))
    print(confusion_matrix(y_true, y_pred))
    print("F1-score:", f1_score(y_true, y_pred))
    print("Precision-score:", precision_score(y_true, y_pred))
    print("Recall-score:", recall_score(y_true, y_pred))