In [None]:
import os
import random
import glob
import pandas as pd
import numpy as np
import torch
import cv2
from tqdm.notebook import tqdm
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import AdamW
from torch.utils.data import Dataset,DataLoader, TensorDataset
from PIL import Image
from torchvision import transforms
import matplotlib.pyplot as plt
import matplotlib
import plotly.express as px
import plotly.graph_objects as go
import optuna
from sklearn.metrics import f1_score,accuracy_score, recall_score, confusion_matrix,precision_score
from torch.utils.data import random_split
import re
from collections import defaultdict

In [None]:
seed = 42
os.environ['PYTHONHASHSEED'] = str(seed)
g = torch.Generator()
g.manual_seed(42)
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [None]:
class AttentionBlock(nn.Module):
    def __init__(self, F_g, F_l, F_int):
        super(AttentionBlock, self).__init__()
        self.W_g = nn.Sequential(
            nn.Conv2d(F_g, F_int, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(F_int)
        )
        self.W_x = nn.Sequential(
            nn.Conv2d(F_l, F_int, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(F_int)
        )
        self.psi = nn.Sequential(
            nn.Conv2d(F_int, 1, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(1),
            nn.Sigmoid()
        )
        self.relu = nn.ReLU(inplace=True)

    def forward(self, g, x):
        # g: gating signa
        # x: encoder's skip connection
        g1 = self.W_g(g)
        x1 = self.W_x(x)
        psi = self.relu(g1 + x1)
        psi = self.psi(psi)
        return x * psi  # skip connection transform

In [None]:
class UNetAutoencoder(nn.Module):
    def __init__(self, in_channels=1, out_channels=1, features=[32, 64, 128, 256]):
        super(UNetAutoencoder, self).__init__()
        self.encoder_layers = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Encoder: ReLU 
        for feature in features:
            self.encoder_layers.append(
                nn.Sequential(
                    nn.Conv2d(in_channels, feature, kernel_size=3, padding=1),
                    nn.BatchNorm2d(feature),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(feature, feature, kernel_size=3, padding=1),
                    nn.BatchNorm2d(feature),
                    nn.ReLU(inplace=True)
                )
            )
            in_channels = feature

        # Bottleneck: ReLU 
        self.bottleneck = nn.Sequential(
            nn.Conv2d(features[-1], features[-1]*2, kernel_size=3, padding=1),
            nn.BatchNorm2d(features[-1]*2),
            nn.ReLU(inplace=True),
            nn.Conv2d(features[-1]*2, features[-1]*2, kernel_size=3, padding=1),
            nn.BatchNorm2d(features[-1]*2),
            nn.ReLU(inplace=True)
        )

        # Decoder: ReLU + Attention 
        self.upconvs = nn.ModuleList()
        self.decoder_layers = nn.ModuleList()
        self.attention_blocks = nn.ModuleList()

        for feature in reversed(features):
            up_in_channels = features[-1]*2 if feature == features[-1] else feature*2
            self.upconvs.append(
                nn.ConvTranspose2d(up_in_channels, feature, kernel_size=2, stride=2)
            )
            self.attention_blocks.append(
                AttentionBlock(F_g=feature, F_l=feature, F_int=feature // 2)
            )
            self.decoder_layers.append(
                nn.Sequential(
                    nn.Conv2d(feature * 2, feature, kernel_size=3, padding=1),
                    nn.BatchNorm2d(feature),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(feature, feature, kernel_size=3, padding=1),
                    nn.BatchNorm2d(feature),
                    nn.ReLU(inplace=True)
                )
            )

        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)
        self.activation = nn.Sigmoid()

    def forward(self, x):
        skip_connections = []
        for encoder in self.encoder_layers:
            x = encoder(x)
            skip_connections.append(x)
            x = self.pool(x)
        
        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1]

        for idx in range(len(self.upconvs)):
            x = self.upconvs[idx](x)
            skip_connection = skip_connections[idx]
            if x.shape != skip_connection.shape:
                diffY = skip_connection.shape[2] - x.shape[2]
                diffX = skip_connection.shape[3] - x.shape[3]
                x = F.pad(x, [diffX // 2, diffX - diffX // 2, diffY // 2, diffY - diffY // 2])

            skip_connection = self.attention_blocks[idx](g=x, x=skip_connection)
            x = torch.cat((skip_connection, x), dim=1)
            x = self.decoder_layers[idx](x)

        x = self.final_conv(x)
        return self.activation(x)

In [None]:
def get_single_channel_transform(channel_index, size=(200, 200), train=False):
    if train:
        return transforms.Compose([
            transforms.Lambda(lambda x: x[channel_index, :, :].unsqueeze(0)), 
            transforms.ToPILImage(),
            transforms.Resize(size),
            transforms.ColorJitter(brightness=(0.5, 1.5)),
            transforms.ToTensor()
        ])
    else:
        return transforms.Compose([
            transforms.Lambda(lambda x: x[channel_index, :, :].unsqueeze(0)),
            transforms.ToPILImage(),
            transforms.Resize(size),
            transforms.ToTensor()
        ])

In [None]:
class TensorClassificationDataset(Dataset):
    def __init__(self, tensor_list, label_list):
        self.tensors = tensor_list
        self.labels = label_list

    def __len__(self):
        return len(self.tensors)

    def __getitem__(self, idx):
        return self.tensors[idx], self.labels[idx]


In [None]:
class CustomDataset(Dataset):
    def __init__(self, data_tensors, labels, days, sample_ids):

        self.data_tensors = data_tensors
        self.labels = labels
        self.days = days
        self.sample_ids = sample_ids

    def __len__(self):
        return len(self.data_tensors)

    def __getitem__(self, idx):
        x = self.data_tensors[idx]
        y = self.labels[idx]
        d = self.days[idx]
        s = self.sample_ids[idx]
        return x, y, d, s

In [None]:
channel_index= 1
target_width, target_height = 200,200

### Control Group

In [None]:
folder_path = .
csv_files = glob.glob(os.path.join(folder_path, '*.csv'))

tensor_list = []

for csv_file in tqdm(csv_files, desc='Processing CSV files'):
    df = pd.read_csv(csv_file)
    min_x = df['File X'].min()
    min_y = df['File Y'].min()
    df['x_norm'] = df['File X'] - min_x
    df['y_norm'] = df['File Y'] - min_y
    width = int(df['x_norm'].max() + 1)
    height = int(df['y_norm'].max() + 1)
    num_channels = df.shape[1] - 2
    image_array = np.zeros((height, width, num_channels), dtype=np.float32)
    for _, row in df.iterrows():
        x = int(row['x_norm'])
        y = int(row['y_norm'])
        channels = row.iloc[2:2+num_channels].values.astype(np.float32)
        image_array[y, x, :] = channels
    resized_image_array = cv2.resize(image_array, (target_width, target_height))
    tensor_image = torch.tensor(resized_image_array).permute(2, 0, 1)
    tensor_list.append(tensor_image)
    
dataset_tensor = torch.stack(tensor_list)


In [None]:
folder_path = .
tensor_list = []
label_list = []

for label_str in ['0', '1']:  
    label_folder = os.path.join(folder_path, label_str)
    csv_files = glob.glob(os.path.join(label_folder, '*.csv'))
    label = int(label_str)

    for csv_file in tqdm(csv_files, desc=f'Processing label {label}'):
        df = pd.read_csv(csv_file)

        min_x = df['File X'].min()
        min_y = df['File Y'].min()
        df['x_norm'] = df['File X'] - min_x
        df['y_norm'] = df['File Y'] - min_y

        width = int(df['x_norm'].max() + 1)
        height = int(df['y_norm'].max() + 1)

        num_channels = df.shape[1] - 2
        image_array = np.zeros((height, width, num_channels), dtype=np.float32)

        for _, row in df.iterrows():
            x = int(row['x_norm'])
            y = int(row['y_norm'])
            channels = row.iloc[2:2+num_channels].values.astype(np.float32)
            image_array[y, x, :] = channels

        resized = cv2.resize(image_array, (target_width, target_height))
        tensor_image = torch.tensor(resized).permute(2, 0, 1)

        tensor_list.append(tensor_image)
        label_list.append(label)


validset_tensor = TensorClassificationDataset(tensor_list, label_list)
val_loader = DataLoader(validset_tensor, batch_size=32, shuffle=False)


In [None]:
# Hyperparameters
train_epochs = 200
batch_size = 32

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
criterion = nn.MSELoss()

In [None]:
transform_test = get_single_channel_transform(channel_index, size=(target_width,target_height), train=False)
test_images = []
test_labels = []
for img, label in tqdm(zip(tensor_list, label_list), total=len(tensor_list)):
    treat_img = transform_test(img)  
    test_images.append(treat_img)
    test_labels.append(label)

test_data = torch.stack(test_images) 
test_targets = torch.tensor(test_labels)  

test_dataset = TensorDataset(test_data, test_targets)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, generator=g)

In [None]:
def objective(trial):
    learning_rate = trial.suggest_float("lr", 1e-5, 1e-2, log=True)
    weight_decay = trial.suggest_float("weight_decay", 1e-6, 1e-2, log=True)

    transform_train = get_single_channel_transform(channel_index, size=(target_width,target_height), train=False)
    train_images = []
    for img in tqdm(dataset_tensor, total=len(dataset_tensor)):
        treat_img = transform_test(img)  
        train_images.append(treat_img)
    
    train_data = torch.stack(train_images) 
    
    val_ratio = 0.2
    val_size = int(len(train_data) * val_ratio)
    train_size = len(train_data) - val_size
    train_dataset, val_dataset = random_split(
        TensorDataset(train_data, train_data),
        [train_size, val_size],
        generator=torch.Generator().manual_seed(42)
    )
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, generator=g)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    model = UNetAutoencoder(in_channels=1, out_channels=1, features=[16, 32, 64, 128]).to(device)
    optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

    best_val_loss = float("inf")
    patience_counter = 0
    improvement_threshold = 0.00005
    early_stopping_patience = 10

    for epoch in range(train_epochs):
        model.train()
        total_loss = 0.0
        progress = tqdm(train_loader, desc=f"[Trial {trial.number}] Epoch {epoch+1}/{train_epochs}", leave=False)
        for inputs, targets in progress:
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            total_loss += loss.item() * inputs.size(0)
        avg_train_loss = total_loss / len(train_loader.dataset)

        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for val_inputs, val_targets in val_loader:
                val_inputs, val_targets = val_inputs.to(device), val_targets.to(device)
                val_outputs = model(val_inputs)
                loss = criterion(val_outputs, val_targets)
                val_loss += loss.item() * val_inputs.size(0)
        avg_val_loss = val_loss / len(val_loader.dataset)

        tqdm.write(f"[Trial {trial.number}] Epoch {epoch+1} Train Loss: {avg_train_loss:.6f} | Val Loss: {avg_val_loss:.6f}")

        if best_val_loss - avg_val_loss > improvement_threshold:
            best_val_loss = avg_val_loss
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= early_stopping_patience:
                break

    model.eval()
    all_anomaly_scores = []
    all_true_labels = []

    with torch.no_grad():
        for test_batch in tqdm(test_loader, desc=f"[Trial {trial.number}] Evaluating", leave=False):
            test_image_tensor, test_label = test_batch
            test_image_tensor = test_image_tensor.to(device)
            reconstructed = model(test_image_tensor)
            batch_scores = ((reconstructed - test_image_tensor) ** 2).view(test_image_tensor.size(0), -1).mean(dim=1)
            all_anomaly_scores.extend(batch_scores.cpu().numpy())
            all_true_labels.extend(test_label.cpu().numpy())

    # ----- 5. Threshold sweep -----
    anomaly_scores = np.array(all_anomaly_scores)
    true_labels = np.array(all_true_labels).astype(int)
    
    best_acc = 0
    best_sen = 0
    best_spe = 0
    best_threshold = None
    thresholds = np.linspace(anomaly_scores.min(), anomaly_scores.max(), 1000)
    for threshold in thresholds:
        pred_labels = (anomaly_scores > threshold).astype(int)
    
        acc = accuracy_score(true_labels, pred_labels)
        sen = recall_score(true_labels, pred_labels, pos_label=1)
        pre = precision_score(true_labels, pred_labels, pos_label=1, zero_division=0)
    
        tn, fp, fn, tp = confusion_matrix(true_labels, pred_labels).ravel()
        spe = tn / (tn + fp + 1e-8)
    
        if acc > best_acc:
            best_acc = acc
            best_sen = sen
            best_spe = spe
            best_threshold = threshold
    
    tqdm.write(f"[Trial {trial.number}] Best Accuracy   : {best_acc:.4f}")
    tqdm.write(f"[Trial {trial.number}] Best Sensitivity: {best_sen:.4f}")
    tqdm.write(f"[Trial {trial.number}] Best Specificity: {best_spe:.4f}")
    tqdm.write(f"[Trial {trial.number}] Best Threshold  : {best_threshold:.6f}")
    
    return best_acc

In [None]:
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=30)

print(f" Best trial: {study.best_trial.number}")
print(f"  ACC: {study.best_value:.4f}")
print(f"  lr: {study.best_params['lr']}")
print(f"  weight_decay: {study.best_params['weight_decay']}")
