## Supervised_learning

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import torchvision.models as models
from sklearn.metrics import roc_curve
from scipy.optimize import brentq
from scipy.interpolate import interp1d
from torchvision import datasets, transforms
import timm
import torch.optim as optim
import random
from torch.optim import Adam
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, roc_auc_score

In [None]:

# Set random seed for all devices (both CPU and CUDA)
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)


# Define paths to your data folders
# Load labeled, validation, and test datasets
train_data_dir = '/oci/train/'
val_data_dir = '/oci/validation/'
test_data_dir = '/oci/test/'

# Example transformations (customize based on your requirements)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load your training, validation, and test datasets using ImageFolder
train_dataset = datasets.ImageFolder(root=train_data_dir, transform=transform)
val_dataset = datasets.ImageFolder(root=val_data_dir, transform=transform)
test_dataset = datasets.ImageFolder(root=test_data_dir, transform=transform)

# Print the number of images in each dataset
print(f"Number of images in the training set: {len(train_dataset)}")
print(f"Number of images in the validation set: {len(val_dataset)}")
print(f"Number of images in the test set: {len(test_dataset)}")

# Create DataLoader for training, validation, and test sets
batch_size = 8  # Adjust according to your needs
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

In [None]:
# Define your SimpleDenseNet model (as defined previously)
class SimpleDenseNet(nn.Module):
    def __init__(self, num_classes):
        super(SimpleDenseNet, self).__init__()
        self.model = timm.create_model('densenet201', pretrained=True)
        in_features = self.model.classifier.in_features
        self.model.classifier = nn.Sequential(
            nn.Linear(in_features, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        return self.model(x)

# Instantiate the custom DenseNet model
num_classes = 2  # Binary classification
model = SimpleDenseNet(num_classes)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9)

In [None]:

# Define your model, train_loader, val_loader, test_loader, criterion, and optimizer here

num_epochs = 100
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Initialize EER and threshold variables
best_val_loss = float('inf')
best_epoch = 0
early_stopping_patience = 2
patience_counter = 0

for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    # Validation loop
    model.eval()
    val_loss = 0.0
    all_labels = []
    all_scores = []

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            scores = torch.nn.functional.softmax(outputs, dim=1)[:, 1]
            all_labels.extend(labels.cpu().numpy())
            all_scores.extend(scores.cpu().numpy())

    val_loss /= len(val_loader)

    # Check for NaN values in the arrays
    if np.isnan(val_loss):
        print("Error: NaN value encountered in validation loss. Skipping this epoch.")
        continue  # Skip to the next epoch
    else:
        # Early stopping check
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_epoch = epoch
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= early_stopping_patience:
                print("Validation loss didn't improve for {} epochs. Early stopping...".format(early_stopping_patience))
                break

        fpr, tpr, thresholds = roc_curve(all_labels, all_scores, pos_label=1)
        eer = brentq(lambda x: 1. - x - interp1d(fpr, tpr)(x), 0., 1.)
        threshold = thresholds[np.nanargmin(np.abs(fpr - eer))]

        print(f"Epoch {epoch + 1}/{num_epochs}, Validation EER: {eer * 100:.2f}%")
        print(f"Validation EER Threshold: {threshold:.4f}")

    if patience_counter >= early_stopping_patience:
        break  # Stop training if early stopping condition met


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

test_labels = []
test_scores = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        outputs = model(inputs)
        scores = torch.nn.functional.softmax(outputs, dim=1)[:, 1]

        test_labels.extend(labels.cpu().numpy())
        test_scores.extend(scores.cpu().numpy())

# Calculate the HTER on the testing set using the EER threshold
threshold_test = threshold  # Use the EER threshold from the validation set for testing
predicted_labels_test = [1 if score > threshold_test else 0 for score in test_scores]

false_acceptance_test = sum(1 for i in range(len(predicted_labels_test)) if predicted_labels_test[i] == 1 and test_labels[i] == 0)
false_rejection_test = sum(1 for i in range(len(predicted_labels_test)) if predicted_labels_test[i] == 0 and test_labels[i] == 1)

total_samples_test = len(test_labels)
hter_test = ((false_acceptance_test + false_rejection_test) / (2 * total_samples_test)) * 100
print(f"HTER using EER threshold: {hter_test:.2f}%")

# Calculate AUC on the test set
auc_test = roc_auc_score(test_labels, test_scores)
print(f"Area Under the ROC Curve (AUC) on the test set: {auc_test * 100:.2f}%")