In [1]:
from imblearn.over_sampling import SMOTE, RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from sklearn.metrics import f1_score
import torch.optim as optim
import random


In [None]:
# Read the CSV file into a DataFrame
df = pd.read_csv('data.csv')

# Remove rows that contain NaN values
df.dropna(inplace=True)

# Save the cleaned DataFrame back to a new CSV file
df.to_csv('cleaned_file.csv', index=False)

In [None]:
# Set random seed for reproducibility
seed = 0
torch.manual_seed(seed)
np.random.seed(seed)
def prepare_data(df):
    # Set random seed for reproducibility
    seed = 0
    torch.manual_seed(seed)
    np.random.seed(seed)
    # Separate features and labels
    X = df.iloc[:, 1:].values  # Features (all columns except the first one)
    y = df.iloc[:, 0].values   # Labels (the first column)
    
    # Convert -1 to 0 for PyTorch's BCELoss
    y[y == -1] = 0
    y_tensor = torch.tensor(y, dtype=torch.float32).view(-1, 1)
    X_tensor = torch.tensor(X, dtype=torch.float32)
    
    # Split the dataset into training, validation, and test sets
    X_train, X_temp, y_train, y_temp = train_test_split(X_tensor, y_tensor, test_size=0.4, random_state=42)
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=43)
    
    return X_train, y_train, X_val, y_val, X_test, y_test

In [2]:
def process_data(X, y, method="none"):
    # Set random seed for reproducibility
    seed = 42
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    if method == "smote":
        seed = 42
        torch.manual_seed(seed)
        np.random.seed(seed)
        smote = SMOTE(random_state=42)
        X_resampled_np, y_resampled_np = smote.fit_resample(X.numpy(), y.numpy().ravel())
    elif method == "random":
        seed = 42
        torch.manual_seed(seed)
        np.random.seed(seed)
        random_sampler = RandomOverSampler(random_state=42)
        X_resampled_np, y_resampled_np = random_sampler.fit_resample(X.numpy(), y.numpy().ravel())
    elif method == "undersample":
        # Set random seed for reproducibility
        seed = 42
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed(seed)
            torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
        np.random.seed(seed)
        undersampler = RandomUnderSampler(random_state=42)
        X_resampled_np, y_resampled_np = undersampler.fit_resample(X.numpy(), y.numpy().ravel())
    elif method == "none":
        X_resampled_np, y_resampled_np = X.numpy(), y.numpy().ravel()
    else:
        raise ValueError(f"Invalid method: {method}")
    
    return torch.tensor(X_resampled_np, dtype=torch.float32), torch.tensor(y_resampled_np, dtype=torch.float32).view(-1, 1)

In [4]:
def get_model(input_dim):
    class MultiLayerPerceptron(nn.Module):
        def __init__(self, input_dim):
            super(MultiLayerPerceptron, self).__init__()
            self.fc1 = nn.Linear(input_dim, 128)
            self.fc2 = nn.Linear(128, 64)
            self.fc3 = nn.Linear(64, 1)

        def forward(self, x):
            x = torch.relu(self.fc1(x))
            x = torch.relu(self.fc2(x))
            x = torch.sigmoid(self.fc3(x))
            return x

    model = MultiLayerPerceptron(input_dim)
    return model

In [None]:
def train(X_train, y_train, X_val, y_val, epochs=1000):
    # Set random seed for reproducibility
    seed = 42
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    input_dim = X_train.shape[1]
    model = get_model(input_dim)
    optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=0.0001)

    criterion = nn.BCELoss()

    train_f1s = []
    val_f1s = []
    best_val_f1 = 0.0
    train_losses = []
    val_losses = []
    best_model_weights = None
    train_f1s_pos = []
    train_f1s_neg = []
    val_f1s_pos = []
    val_f1s_neg = []
 
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(X_train)
        loss = criterion(outputs, y_train)
        train_losses.append(loss.item())
        loss.backward()
        optimizer.step()

        train_preds = (model(X_train) > 0.5).float()
        train_f1_pos = f1_score(y_train.numpy(), train_preds.numpy(), pos_label=1)
        train_f1_neg = f1_score(y_train.numpy(), train_preds.numpy(), pos_label=0)
        train_f1s_pos.append(train_f1_pos)
        train_f1s_neg.append(train_f1_neg)

        train_f1_avg = (train_f1_pos + train_f1_neg) / 2.0
        train_f1s.append(train_f1_avg)


        if (epoch + 1) % 100 == 0:
            model.eval()
            with torch.no_grad():
                val_outputs = model(X_val)
                val_loss = criterion(val_outputs, y_val)
                val_losses.append(val_loss.item())
                val_preds = (val_outputs > 0.5).float()
                val_f1_pos = f1_score(y_val.numpy(), val_preds.numpy(), pos_label=1)
                val_f1_neg = f1_score(y_val.numpy(), val_preds.numpy(), pos_label=0)
                val_f1s_pos.append(val_f1_pos)
                val_f1s_neg.append(val_f1_neg)

                val_f1_avg = (val_f1_pos + val_f1_neg) / 2.0
                val_f1s.append(val_f1_avg)


                # Model selection based on the average of positive and negative F1-scores
                if val_f1_avg > best_val_f1:
                    best_val_f1 = val_f1_avg
                    best_model_weights = model.state_dict()

            model.train()

    model.load_state_dict(best_model_weights)
    metrics = {
        "train_f1s": train_f1s,
        "val_f1s": val_f1s,
        "train_losses": train_losses,
        "val_losses": val_losses,
        "train_f1s_pos": train_f1s_pos,
        "train_f1s_neg": train_f1s_neg,
        "val_f1s_pos": val_f1s_pos,
        "val_f1s_neg": val_f1s_neg,
    }
    return model, metrics

In [6]:
def evaluate(model, X_test, y_test, criterion):
    # Set random seed for reproducibility
    seed = 42
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    model.eval()
    with torch.no_grad():
        test_outputs = model(X_test)
        test_preds = (test_outputs > 0.5).float()
        test_f1_pos = f1_score(y_test.numpy(), test_preds.numpy(), pos_label=1)
        test_f1_neg = f1_score(y_test.numpy(), test_preds.numpy(), pos_label=0)
        test_recall_pos = recall_score(y_test.numpy(), test_preds.numpy(), pos_label=1)
        test_recall_neg = recall_score(y_test.numpy(), test_preds.numpy(), pos_label=0)
        test_precision_pos = precision_score(y_test.numpy(), test_preds.numpy(), pos_label=1)
        test_precision_neg = precision_score(y_test.numpy(), test_preds.numpy(), pos_label=0)
        test_accuracy = accuracy_score(y_test.numpy(), test_preds.numpy())
        
        test_loss = criterion(test_outputs, y_test)
        
    metrics = {
        "test_f1_pos": test_f1_pos,
        "test_f1_neg": test_f1_neg,
        "test_recall_pos": test_recall_pos,
        "test_recall_neg": test_recall_neg,
        "test_precision_pos": test_precision_pos,
        "test_precision_neg": test_precision_neg,
        "test_accuracy": test_accuracy,
        "test_loss": test_loss.item()
    }
    return metrics

In [None]:
df = pd.read_csv('cleaned_file.csv')
X_train, y_train, X_val, y_val, X_test, y_test = prepare_data(df)


In [None]:
# Set random seed for reproducibility
seed = 42
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed)
# Use the desired method to process the training data
method = "none" 
X_train_none, y_train_none = process_data(X_train, y_train, method=method)

model_none, train_metrics_none = train(X_train_none, y_train_none, X_val, y_val)

test_metrics_none = evaluate(model_none, X_test, y_test, criterion)

print("Test F1-Score (Positive):", test_metrics_none["test_f1_pos"])
print("Test F1-Score (Negative):", test_metrics_none["test_f1_neg"])
print("Test Loss:", test_metrics_none["test_loss"])
visualize_results(1000, train_metrics_none["train_f1s"], train_metrics_none["val_f1s"])
plot_losses(1000, train_metrics_none["train_losses"], train_metrics_none["val_losses"])

In [None]:
# Set random seed for reproducibility
seed = 42
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed)
# Using the desired method to process the training data
method = "undersample" 
X_train_under, y_train_under = process_data(X_train, y_train, method=method)

model_under, train_metrics_under = train(X_train_under, y_train_under, X_val, y_val)

test_metrics_under = evaluate(model_under, X_test, y_test, criterion)

print("Test F1-Score (Positive):", test_metrics["test_f1_pos"])
print("Test F1-Score (Negative):", test_metrics["test_f1_neg"])
print("Test Loss:", test_metrics["test_loss"])
visualize_results(1000, train_metrics["train_f1s"], train_metrics["val_f1s"])
plot_losses(1000, train_metrics["train_losses"], train_metrics["val_losses"])

In [None]:
# Set random seed for reproducibility
seed = 42
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed)
# Using the desired method to process the training data
method = "random" 
X_train_random, y_train_random = process_data(X_train, y_train, method=method)

model_random, train_metrics_random = train(X_train_random, y_train_random, X_val, y_val)

test_metrics_random = evaluate(model_random, X_test, y_test, criterion)

print("Test F1-Score (Positive):", test_metrics["test_f1_pos"])
print("Test F1-Score (Negative):", test_metrics["test_f1_neg"])
print("Test Loss:", test_metrics["test_loss"])
visualize_results(1000, train_metrics["train_f1s"], train_metrics["val_f1s"])
plot_losses(1000, train_metrics["train_losses"], train_metrics["val_losses"])

In [None]:
# Set random seed for reproducibility
seed = 42
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed)
# Using the desired method to process the training data
method = "smote" 
X_train_smote, y_train_smote = process_data(X_train, y_train, method=method)

model_smote, train_metrics_smote = train(X_train_smote, y_train_smote, X_val, y_val)

test_metrics_smote = evaluate(model_smote, X_test, y_test, criterion)

print("Test F1-Score (Positive):", test_metrics["test_f1_pos"])
print("Test F1-Score (Negative):", test_metrics["test_f1_neg"])
print("Test Loss:", test_metrics["test_loss"])
visualize_results(1000, train_metrics["train_f1s"], train_metrics["val_f1s"])
plot_losses(1000, train_metrics["train_losses"], train_metrics["val_losses"])

In [7]:
import pandas as pd

def generate_comparison_table_with_avg(*evaluation_results):
    methods = ["none", "smote", "random", "undersample"]
    
    # Ensure we have results for each method
    assert len(evaluation_results) == len(methods), "Provide evaluation results for each method."
    
    # Extracting the metrics we need
    rows = []
    for method, result in zip(methods, evaluation_results):
        avg_recall = (result["test_recall_pos"] + result["test_recall_neg"]) / 2
        avg_precision = (result["test_precision_pos"] + result["test_precision_neg"]) / 2
        avg_f1 = (result["test_f1_pos"] + result["test_f1_neg"]) / 2
        
        row = {
            "Method": method,
            "F1 (Positive)": result["test_f1_pos"],
            "F1 (Negative)": result["test_f1_neg"],
            "Avg F1": avg_f1,
            "Recall (Positive)": result["test_recall_pos"],
            "Recall (Negative)": result["test_recall_neg"],
            "Avg Recall": avg_recall,
            "Precision (Positive)": result["test_precision_pos"],
            "Precision (Negative)": result["test_precision_neg"],
            "Avg Precision": avg_precision,
            "Accuracy": result["test_accuracy"],
        }
        rows.append(row)
        
    df = pd.DataFrame(rows)
    return df

def save_comparison_table_to_csv(filename, *evaluation_results):
    # Generate the comparison table
    df = generate_comparison_table_with_avg(*evaluation_results)
    
    # Save to CSV
    df.to_csv(filename, index=False)

    return f"Comparison table saved to {filename}"


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Load the data
df = pd.read_csv('comparison_table.csv')

# Set the style of the visualization
sns.set_style("whitegrid")

# Draw a bar plot of 'Avg F1' score for each method
plt.figure(figsize=(10, 6))
sns.barplot(x=df['Method'], y=df['Avg F1'], palette="Blues_d")
plt.title('Average F1 Score by Method')
plt.show()

In [None]:
def plot_losses(epochs, train_losses, val_losses):
    # Set random seed for reproducibility
    seed = 42
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    
    # Plotting
    plt.figure(figsize=(10, 6))
    plt.plot(range(epochs), train_losses, label='Training Loss')
    plt.plot(range(0, epochs, 100), val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Log Loss')
    plt.yscale('log')  # Set y-axis scale to logarithmic
    plt.title('Log Loss vs. Epoch')
    plt.legend()
    plt.show()