In [3]:
!nvidia-smi
#!pip install imbalanced-learn
!pip install lime

Tue Dec 10 20:10:43 2024       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 560.94                 Driver Version: 560.94         CUDA Version: 12.6     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                  Driver-Model | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA GeForce RTX 3050      WDDM  |   00000000:01:00.0  On |                  N/A |
| 41%   25C    P8             10W /  130W |    1545MiB /   8192MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [None]:
# Import necessary libraries
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from torch.cuda.amp import GradScaler, autocast

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Define data directories
data_dir = r"F:\Peter\ML_exp\PROCESS-V1\Spectrograms"  # Replace with the path to the spectrograms
healthy_dir = os.path.join(data_dir, "Healthy")
not_healthy_dir = os.path.join(data_dir, "Not Healthy")

# Define transformations for the training and validation datasets
transform = transforms.Compose([
    transforms.RandomResizedCrop((128, 128)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])


# Prepare datasets and split into train/validation sets
dataset = datasets.ImageFolder(data_dir, transform=transform)
class_names = dataset.classes  # ['Healthy', 'Not Healthy']

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=2)

# Define the model (using a pre-trained ResNet)
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)  # Updated weights usage
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, len(class_names))  # 2 classes: Healthy and Not Healthy
model = model.to(device)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Mixed precision training setup
scaler = GradScaler()

# Function to calculate metrics
def calculate_metrics(y_true, y_pred, average='binary'):
    f1 = f1_score(y_true, y_pred, average=average)
    precision = precision_score(y_true, y_pred, average=average)
    recall = recall_score(y_true, y_pred, average=average)
    return f1, precision, recall

# Training function
def train_model(model, criterion, optimizer, train_loader, val_loader, device, epochs=10):
    for epoch in range(epochs):
        print(f"Epoch {epoch+1}/{epochs}")
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()

            # Mixed precision
            with autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        train_loss = running_loss / len(train_loader)
        train_acc = correct / total
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")

# Train the model
train_model(model, criterion, optimizer, train_loader, val_loader, device, epochs=20)

# Save the trained model
torch.save(model.state_dict(), "spectrogram_classifier_20.pth")


print("Model saved as spectrogram_classifier_20.pth")

In [None]:
# Import necessary libraries
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
from sklearn.metrics import f1_score, precision_score, recall_score, classification_report
from torch.cuda.amp import GradScaler, autocast
import numpy as np
from sklearn.utils.class_weight import compute_class_weight
from imblearn.over_sampling import SMOTE
from lime import lime_image
from skimage.segmentation import mark_boundaries


# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Define data directories
data_dir = r"F:\Peter\ML_exp\PROCESS-V1\Spectrograms"  # Replace with the path to the spectrograms
healthy_dir = os.path.join(data_dir, "Healthy")
not_healthy_dir = os.path.join(data_dir, "Not Healthy")

# Define transformations for the training and validation datasets
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize to optimize memory usage
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Prepare datasets and split into train/validation sets
dataset = datasets.ImageFolder(data_dir, transform=transform)
class_names = dataset.classes  # ['Healthy', 'Not Healthy']

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=2)

# Define the model (using a pre-trained ResNet)
from torchvision.models import mobilenet_v2
model = mobilenet_v2(weights="IMAGENET1K_V1")
num_ftrs = model.last_channel
model.classifier[1] = nn.Linear(num_ftrs, len(class_names))
model = model.to(device)


# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Mixed precision training setup
scaler = GradScaler()

# Calculate metrics function
def calculate_metrics(y_true, y_pred, average='binary'):
    f1 = f1_score(y_true, y_pred, average=average)
    precision = precision_score(y_true, y_pred, average=average)
    recall = recall_score(y_true, y_pred, average=average)
    return f1, precision, recall

# Training and Validation function
# Define the updated train_model function with advanced overfitting detection
def train_model_with_smote_and_explainability(model, criterion, optimizer, train_loader, val_loader, device, epochs=10):
    best_val_f1 = 0.0
    lr = 0.0001
    weight_decay = 1e-4
    overfitting_threshold = 3  # Number of consecutive epochs of overfitting to trigger adjustments
    overfitting_epochs = 0

    # Track historical loss differences for smarter overfitting detection
    loss_differences = []

    for epoch in range(epochs):
        print(f"\nEpoch {epoch+1}/{epochs}, Learning Rate: {lr:.6f}, Weight Decay: {weight_decay:.6f}")

        # --- Training Phase ---
        model.train()
        train_loss, train_correct, train_total = 0.0, 0, 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()

            with autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_total += labels.size(0)
            train_correct += predicted.eq(labels).sum().item()

        train_loss /= len(train_loader)
        train_acc = train_correct / train_total

        # --- Validation Phase ---
        model.eval()
        val_loss, val_correct, val_total = 0.0, 0, 0
        all_labels, all_preds = [], []

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_loss += loss.item()
                _, predicted = outputs.max(1)
                val_total += labels.size(0)
                val_correct += predicted.eq(labels).sum().item()

                all_labels.extend(labels.cpu().numpy())
                all_preds.extend(predicted.cpu().numpy())

        val_loss /= len(val_loader)
        val_acc = val_correct / val_total

        # Calculate metrics
        val_f1, val_precision, val_recall = calculate_metrics(all_labels, all_preds)

        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
        print(f"Val F1 Score: {val_f1:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}")

        # Save the best model based on validation F1 Score
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            torch.save(model.state_dict(), "best_model.pth")
            print("Best model saved!")

        # --- Smarter Overfitting Detection ---
        loss_diff = val_loss - train_loss
        acc_diff = train_acc - val_acc
        loss_differences.append(loss_diff)

        # Check rolling average of loss differences
        if len(loss_differences) > 3:
            rolling_avg_loss_diff = np.mean(loss_differences[-3:])
            if rolling_avg_loss_diff > 0.1 and acc_diff > 0.1:
                overfitting_epochs += 1
                print(f"Overfitting detected (Rolling Loss Diff: {rolling_avg_loss_diff:.4f}).")
            else:
                overfitting_epochs = 0

        if overfitting_epochs >= overfitting_threshold:
            print("Overfitting persists. Adjusting learning rate and weight decay.")
            lr *= 0.5
            weight_decay *= 0.9
            for param_group in optimizer.param_groups:
                param_group['lr'] = lr
                param_group['weight_decay'] = weight_decay
            overfitting_epochs = 0  # Reset counter

    # Explain model predictions with LIME after training
    explain_with_lime(model, val_loader, device)

# SMOTE: Handle imbalanced datasets
def apply_smote(train_loader):
    smote = SMOTE()
    train_data = []
    train_labels = []

    for inputs, labels in train_loader:
        train_data.extend(inputs.view(inputs.size(0), -1).numpy())
        train_labels.extend(labels.numpy())

    train_data_smote, train_labels_smote = smote.fit_resample(np.array(train_data), np.array(train_labels))
    print("SMOTE applied. Resampled dataset size:", len(train_labels_smote))

    return train_data_smote, train_labels_smote

# LIME: Explain predictions
def explain_with_lime(model, val_loader, device):
    explainer = lime_image.LimeImageExplainer()

    model.eval()
    with torch.no_grad():
        for inputs, _ in val_loader:
            inputs = inputs.to(device)
            sample_image = inputs[0].cpu().numpy().transpose(1, 2, 0)

            explanation = explainer.explain_instance(
                image=sample_image,
                classifier_fn=lambda x: model(torch.tensor(x.transpose(0, 3, 1, 2)).float().to(device)).detach().cpu().numpy(),
                top_labels=2,
                hide_color=0,
                num_samples=1000
            )

            temp, mask = explanation.get_image_and_mask(
                label=1,  # Assuming class 1 is "Not Healthy"
                positive_only=True,
                num_features=10,
                hide_rest=False
            )

            lime_image = mark_boundaries(temp, mask)
            plt.imshow(lime_image)
            plt.title("LIME Explanation")
            plt.show()
            break  # Explain one sample for demonstration

# Preprocess dataset with SMOTE
train_data_smote, train_labels_smote = apply_smote(train_loader)

# Train model with the new improvements
train_model_with_smote_and_explainability(model, criterion, optimizer, train_loader, val_loader, device, epochs=15)



# Train the model
#train_model(model, criterion, optimizer, train_loader, val_loader, device, epochs=15)

# Save the final trained model
torch.save(model.state_dict(), "final_spectrogram_classifier_15.pth")
print("Model saved as final_spectrogram_classifier_15.pth")


Using device: cuda
SMOTE applied. Resampled dataset size: 4080

Epoch 1/15, Learning Rate: 0.000100, Weight Decay: 0.000100
Train Loss: 0.6428, Train Acc: 0.6458
Val Loss: 0.5591, Val Acc: 0.7081
Val F1 Score: 0.7166, Precision: 0.7621, Recall: 0.6762
Best model saved!

Epoch 2/15, Learning Rate: 0.000100, Weight Decay: 0.000100
Train Loss: 0.4800, Train Acc: 0.7717
Val Loss: 0.4671, Val Acc: 0.7785
Val F1 Score: 0.7950, Precision: 0.8033, Recall: 0.7869
Best model saved!

Epoch 3/15, Learning Rate: 0.000100, Weight Decay: 0.000100
Train Loss: 0.3505, Train Acc: 0.8478
Val Loss: 0.4394, Val Acc: 0.8020
Val F1 Score: 0.8210, Precision: 0.8104, Recall: 0.8320
Best model saved!

Epoch 4/15, Learning Rate: 0.000100, Weight Decay: 0.000100
Train Loss: 0.2479, Train Acc: 0.8993
Val Loss: 0.5090, Val Acc: 0.7942
Val F1 Score: 0.8182, Precision: 0.7901, Recall: 0.8484
Overfitting detected (Rolling Loss Diff: 0.1124).

Epoch 5/15, Learning Rate: 0.000100, Weight Decay: 0.000100
Train Loss: 0.16