<a href="https://colab.research.google.com/github/Barde-S/DLI/blob/main/DLI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install torchinfo --quiet

In [None]:
import os

from collections import Counter

import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
from PIL import Image, UnidentifiedImageError
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix
from torch.utils.data import DataLoader, random_split
from torchinfo import summary
from torchvision import datasets, transforms
from tqdm.notebook import tqdm
import gdown
import zipfile
from google.colab import drive
from google.colab import files
from torchvision import models
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    classification_report,
    confusion_matrix,
    ConfusionMatrixDisplay
)

torch.backends.cudnn.deterministic = True


In [None]:
# Check if CUDA-enabled GPU is available
if torch.cuda.is_available():
    device = "GPU"  # Use GPU if available
# Check if MPS (Metal Performance Shaders) is available for Apple Silicon
elif torch.backends.mps.is_available():
    device = "MPS"  # Use MPS if available
else:
    device = "CPU"  # Fall back to CPU if no GPU or MPS available

# Print the selected device type
print(f"Using {device} device.")

Using GPU device.


In [None]:
if torch.backends.mps.is_available():
    device = "MPS"

print(f"Using {device} device.")

Using GPU device.


In [None]:
uploaded = files.upload()

In [None]:
with zipfile.ZipFile('dataset_split.zip', 'r') as zip_ref:
    zip_ref.extractall('/content/DLI')

FileNotFoundError: [Errno 2] No such file or directory: 'dataset_split.zip'

In [None]:
train_dir = '/content/DLI/dataset_split 2'

for file in os.listdir(train_dir):
    print(file)

In [None]:
ds_store_path = os.path.join(train_dir, '.DS_Store')

if os.path.exists(ds_store_path):
    os.remove(ds_store_path)
    print("Removed .DS_Store")
else:
    print(".DS_Store not found")

In [None]:
classes = os.listdir(train_dir)
print(classes)

In [None]:
height = 224
width = 224


class ConvertToRGB:
    def __call__(self, img):
        if img.mode != "RGB":
            img = img.convert("RGB")
        return img


transform = transforms.Compose([
    ConvertToRGB(),
    transforms.Resize((224,224)),
    transforms.ToTensor()
])

print(transform)

In [None]:
sample_file = "/content/DLI/dataset_split 2/train/Acute Otitis Media/aom (1).jpg"

image = Image.open(sample_file)

transformed_image = transform(image)
print(transformed_image.shape)

In [None]:
dataset = datasets.ImageFolder(root=train_dir, transform=transform)
print("Image size", dataset[0][0].shape)
print("Label", dataset[0][1])

In [None]:
def remove_corrupted_images(folder):
    removed = 0
    for root, _, files in os.walk(folder):
        for file in files:
            file_path = os.path.join(root, file)
            try:
                with Image.open(file_path) as img:
                    img.verify()  # Quick image check
            except (UnidentifiedImageError, OSError):
                print(f"Removing corrupted image: {file_path}")
                os.remove(file_path)
                removed += 1
    print(f"Total corrupted images removed: {removed}")

# Run it on your dataset path
remove_corrupted_images('/content/DLI/dataset_split 2')

In [None]:
# counts = Counter()
# for i in tqdm(range(len(dataset))):
#     try:
#         # Access the item to trigger the image loading and potentially the error
#         x = dataset[i]
#         counts[x[1]] += 1
#     except UnidentifiedImageError:
#         print(f"Skipping file {dataset.samples[i][0]} due to UnidentifiedImageError")
#         continue

# print("The counts dictionary:", counts)

# # This dictionary maps class names to their index.
# print("The class_to_idx dictionary:", dataset.class_to_idx)


# class_distribution = ({cat: counts[idx] for cat, idx in dataset.class_to_idx.items()} )
# print(class_distribution)


counts = Counter(x[1] for x in tqdm(dataset))
print("The counts dictionary:", counts)

# This dictionary maps class names to their index.
print("The class_to_idx dictionary:", dataset.class_to_idx)

# Use both of these to construct the desired dictionary

class_distribution = ({cat: counts[idx] for cat, idx in dataset.class_to_idx.items()} )
print(class_distribution)

In [None]:
batch_size = 32
dataset_loader = DataLoader(dataset, batch_size=32)

# Get one batch
first_batch = next(iter(dataset_loader))

print(f"Shape of one batch: {first_batch[0].shape}")
print(f"Shape of labels: {first_batch[1].shape}")

In [None]:
def get_mean_std(loader):
    """Computes the mean and standard deviation of image data.

     (ideally we caan have this in a separate .py file to just import the function)
            """

    channels_sum, channels_squared_sum, num_batches = 0, 0, 0
    for data, _ in tqdm(loader):
        channels_sum += torch.mean(data, dim=[0, 2, 3])
        channels_squared_sum += torch.mean(data**2, dim=[0, 2, 3])
        num_batches += 1
    # Compute the mean from the channels_sum and num_batches
    mean = channels_sum/num_batches
    # Compute the standard deviation form channels_squared_sum, num_batches,
    # and the mean.
    std = torch.sqrt((channels_squared_sum / num_batches) - (mean**2))

    return mean, std


mean, std = get_mean_std(dataset_loader)

print(f"Mean: {mean}")
print(f"Standard deviation: {std}")

In [None]:
mean = [0.4834, 0.3793, 0.3572]
std = [0.2586, 0.2419, 0.2436]

In [None]:
transform_norm = transforms.Compose([
    ConvertToRGB(),
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std),
])
print(transform_norm)

In [None]:
norm_dataset = datasets.ImageFolder(root=train_dir, transform=transform_norm)

print("Image size", norm_dataset[0][0].shape)
print("Label", norm_dataset[0][1])

In [None]:
g = torch.Generator()
train_dataset, val_dataset = random_split(norm_dataset, [.8,.2], generator=g)

print("Training data set size:", len(train_dataset))
print("Validation data set size:", len(val_dataset))

In [None]:
train_loader = DataLoader(train_dataset, shuffle=True)
val_loader = DataLoader(val_dataset)

In [None]:
def train_epoch(model, optimizer, loss_fn, data_loader, device="cpu"):
    model.train()
    total_loss = 0.0
    total_correct = 0

    for inputs, targets in tqdm(data_loader, desc="Training", leave=False):
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()
        output = model(inputs)
        loss = loss_fn(output, targets)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * inputs.size(0)
        preds = torch.argmax(output, dim=1)
        total_correct += torch.sum(preds == targets).item()

    avg_loss = total_loss / len(data_loader.dataset)
    accuracy = total_correct / len(data_loader.dataset)

    return avg_loss, accuracy


def predict(model, data_loader, device="cpu"):
    model.eval()
    all_probs = []

    with torch.no_grad():
        for inputs, _ in tqdm(data_loader, desc="Predicting", leave=False):
            inputs = inputs.to(device)
            output = model(inputs)
            probs = torch.nn.functional.softmax(output, dim=1)
            all_probs.append(probs.cpu())

    return torch.cat(all_probs, dim=0)


# 3. Score a model on a dataset (validation or test)
def score(model, data_loader, loss_fn, device="cpu"):
    model.eval()
    total_loss = 0.0
    total_correct = 0

    with torch.no_grad():
        for inputs, targets in tqdm(data_loader, desc="Scoring", leave=False):
            inputs, targets = inputs.to(device), targets.to(device)
            output = model(inputs)
            loss = loss_fn(output, targets)

            total_loss += loss.item() * inputs.size(0)
            preds = torch.argmax(output, dim=1)
            total_correct += torch.sum(preds == targets).item()

    avg_loss = total_loss / len(data_loader.dataset)
    accuracy = total_correct / len(data_loader.dataset)

    return avg_loss, accuracy


# 4. Full training loop
def train(model, optimizer, loss_fn, train_loader, val_loader, epochs=20, device="cpu"):
    train_losses = []
    train_accuracies = []
    val_losses = []
    val_accuracies = []

    for epoch in range(1, epochs + 1):
        # Train
        train_loss, train_accuracy = train_epoch(model, optimizer, loss_fn, train_loader, device)
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)

        # Validate
        val_loss, val_accuracy = score(model, val_loader, loss_fn, device)
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)

        # Print status
        print(
            f"Epoch {epoch:02d} | "
            f"Train Loss: {train_loss:.4f} | Train Acc: {train_accuracy:.4f} | "
            f"Val Loss: {val_loss:.4f} | Val Acc: {val_accuracy:.4f}"
        )

    return train_losses, val_losses, train_accuracies, val_accuracies




def early_stopping(validation_loss, best_val_loss, counter):
    """Function that implements Early Stopping"""

    stop = False

    if validation_loss < best_val_loss:
        counter = 0
    else:
        counter += 1

    if counter >= 5:  # our patience is 5 epochs
        stop = True

    return counter, stop


def checkpointing(validation_loss, best_val_loss, model, optimizer, save_path):
    """Function that implements Checkpointing"""

    if validation_loss < best_val_loss:
        torch.save(
            {
                "model_state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
                "loss": best_val_loss,
            },
            save_path,
        )
        print(f"Checkpoint saved with validation loss {validation_loss:.4f}")


def train_callbacks(
    model,
    optimizer,
    loss_fn,
    train_loader,
    val_loader,
    epochs=20,
    device="cpu",
    scheduler=None,
    checkpoint_path=None,
    early_stopping=None,
):
    # Track the model progress over epochs
    train_losses = []
    train_accuracies = []
    val_losses = []
    val_accuracies = []
    learning_rates = []

    # Create the trackers if needed for checkpointing and early stopping
    best_val_loss = float("inf")
    early_stopping_counter = 0

    print("Model evaluation before start of training...")
    # Test on training set
    train_loss, train_accuracy = score(model, train_loader, loss_fn, device)
    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)
    # Test on validation set
    validation_loss, validation_accuracy = score(model, val_loader, loss_fn, device)
    val_losses.append(validation_loss)
    val_accuracies.append(validation_accuracy)

    for epoch in range(1, epochs + 1):
        print("\n")
        print(f"Starting epoch {epoch}/{epochs}")

        # Train one epoch
        train_epoch(model, optimizer, loss_fn, train_loader, device)

        # Evaluate training results
        train_loss, train_accuracy = score(model, train_loader, loss_fn, device)
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)

        # Test on validation set
        validation_loss, validation_accuracy = score(model, val_loader, loss_fn, device)
        val_losses.append(validation_loss)
        val_accuracies.append(validation_accuracy)

        print(f"Epoch: {epoch}")
        print(f"Training loss: {train_loss:.4f}")
        print(f"Training accuracy: {train_accuracy*100:.2f}%")
        print(f"Validation loss: {validation_loss:.4f}")
        print(f"Validation accuracy: {validation_accuracy*100:.2f}%")

        # # Log the learning rate and have the scheduler adjust it
        lr = optimizer.param_groups[0]["lr"]
        learning_rates.append(lr)
        if scheduler:
            scheduler.step()

        # Checkpointing saves the model if current model is better than best so far
        if checkpoint_path:
            checkpointing(
                validation_loss, best_val_loss, model, optimizer, checkpoint_path
            )

        # Early Stopping
        if early_stopping:
            early_stopping_counter, stop = early_stopping(
                validation_loss, best_val_loss, early_stopping_counter
            )

            if stop:
                print(f"Early stopping triggered after {epoch} epochs")
                break

        if validation_loss < best_val_loss:
            best_val_loss = validation_loss

    return (
        learning_rates,
        train_losses,
        val_losses,
        train_accuracies,
        val_accuracies,
        epoch,
    )



In [None]:
torch.manual_seed(42)
torch.cuda.manual_seed(42)

model = torch.nn.Sequential()

conv1_n_kernels = 16
conv1 = torch.nn.Conv2d(in_channels=3, out_channels=16, kernel_size=(3, 3), padding=1)
max_pool_size = 4
max_pool1 = torch.nn.MaxPool2d(max_pool_size)
model.append(conv1)
model.append(torch.nn.ReLU())
model.append(max_pool1)

conv2_n_kernels = 32
conv2 = torch.nn.Conv2d(
    in_channels=16, out_channels=conv2_n_kernels, kernel_size=(3, 3), padding=1
)
max_pool2 = torch.nn.MaxPool2d(max_pool_size)
model.append(conv2)
model.append(torch.nn.ReLU())
model.append(max_pool2)

conv3_n_kernels = 64
conv3 = torch.nn.Conv2d(32, conv3_n_kernels, 3, padding=1)
max_pool3 = torch.nn.MaxPool2d(max_pool_size)
model.append(conv3)
model.append(torch.nn.ReLU())
model.append(max_pool3)

model.append(torch.nn.AdaptiveAvgPool2d((1, 1)))
model.append(torch.nn.Flatten())
model.append(torch.nn.Linear(64, 500))  # 64 is from conv3_n_kernels
model.append(torch.nn.ReLU())
model.append(torch.nn.Dropout())

n_classes = 5
output_layer = torch.nn.Linear(500, n_classes)
model.append(output_layer)

summary(model, input_size=(batch_size, 3, height, width))

In [None]:
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

if torch.cuda.is_available():
    device = 'cuda'
else:
    device = 'cpu'

model.to(device)
print(loss_fn)
print("----------------------")
print(optimizer)
print("----------------------")
print(next(model.parameters()).device)

In [None]:
epochs = 15

train_losses, val_losses, train_accuracies, val_accuracies = train(
    model, optimizer, loss_fn, train_loader, val_loader, epochs, device=device)

In [None]:
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label="Training Loss")
plt.plot(val_losses, label="Validation Loss")
plt.title("Loss over epochs")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label="Training Accuracy")
plt.plot(val_accuracies, label="Validation Accuracy")
plt.title("Accuracy over epochs")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()

plt.show()

In [None]:
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)

print(model)

In [None]:
for name, param in model.named_parameters():
    if "layer4" in name or "fc" in name:
        param.requires_grad = True
    else:
        param.requires_grad = False

In [None]:
torch.manual_seed(42)
torch.cuda.manual_seed(42)

in_features = model.fc.in_features

modified_last_layer = nn.Sequential()

dense_layer = nn.Linear(in_features, 256)
modified_last_layer.append(dense_layer)

relu = nn.ReLU()
modified_last_layer.append(relu)

modified_last_layer.append(nn.Dropout(p=0.5))

output_layer = nn.Linear(256, 5)
modified_last_layer.append(output_layer)

# Assign `modified_last_layer` to `model.fc`
model.fc = modified_last_layer

print(model)

In [None]:
g = torch.Generator()
g.manual_seed(42)

batch_size = 32

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, generator=g)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

# Place the model on device
model.to(device)

# Train the model for 10 epochs
epochs = 10
train_losses, val_losses, train_accuracies, val_accuracies = train(
    model, optimizer, loss_fn, train_loader, val_loader, epochs, device=device
)

In [None]:
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label="Training Loss")
plt.plot(val_losses, label="Validation Loss")
plt.title("Loss over epochs")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label="Training Accuracy")
plt.plot(val_accuracies, label="Validation Accuracy")
plt.title("Accuracy over epochs")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()

plt.show()

In [None]:
test_dir = os.path.join("/content/DLI/dataset_split 2", "test")

test_dataset = datasets.ImageFolder(root=test_dir, transform=transform_norm)


batch_size = 10

test_loader = DataLoader(
    test_dataset, batch_size=batch_size, shuffle=False
)

print("Number of test images:", len(test_dataset))

In [None]:
# Predict the probabilities for each test image
test_probabilities = predict(model, test_loader, device)

# Get the index with the largest probability
test_predictions = torch.argmax(test_probabilities, dim=1)

# way to get class names
test_classes = [test_dataset.classes[i] for i in test_predictions]

# Output
print("Number of predictions:", test_predictions.shape)
print("Predictions (class index):", test_predictions.tolist())
print("Predictions (class name):", test_classes)


In [None]:
epochs_to_train = 50
checkpoint_path = "LR_model.pth"
early_stopping_function = early_stopping

train_results = train_callbacks(
   model,
   optimizer,
   loss_fn,
   train_loader,
   val_loader,
   epochs=epochs_to_train,
   device=device,
   checkpoint_path=checkpoint_path,
   early_stopping=early_stopping_function,
   scheduler=scheduler
)


(
    learning_rates,
    train_losses,
    valid_losses,
    train_accuracies,
    valid_accuracies,
    epochs,
) = train_results

In [None]:
model.eval()

# Collect all predictions and true labels
all_preds = []
all_targets = []

with torch.no_grad():
    for inputs, targets in test_loader:
        inputs = inputs.to(device)
        targets = targets.to(device)

        outputs = model(inputs)
        preds = torch.argmax(outputs, dim=1)

        all_preds.extend(preds.cpu().numpy())
        all_targets.extend(targets.cpu().numpy())



In [None]:
# evaluation metrics
accuracy = accuracy_score(all_targets, all_preds)
precision = precision_score(all_targets, all_preds, average='weighted', zero_division=0)
recall = recall_score(all_targets, all_preds, average='weighted', zero_division=0)
f1 = f1_score(all_targets, all_preds, average='weighted', zero_division=0)


print(f"Accuracy:  {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1 Score:  {f1:.4f}")



In [None]:
# classification report
print("\nClassification Report:\n")
print(classification_report(all_targets, all_preds, target_names=test_dataset.classes))



In [None]:
# Confusion matrix
cm = confusion_matrix(all_targets, all_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=test_dataset.classes,
            yticklabels=test_dataset.classes)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()
