In [1]:
# Import the importatnt libraries
import os
import numpy as np
from torchvision import models
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split, Dataset
import matplotlib.pyplot as plt
from torchvision import transforms
import pandas as pd
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import f1_score, precision_score, recall_score
import copy

In [2]:
# ignoring the warning messages
import warnings
from IPython.display import display
warnings.filterwarnings('ignore')

In [3]:
torch.manual_seed(42)
# batch size for all the future processes
batch_size = 32

In [4]:
data = np.load("/kaggle/input/pneumoniamnist/pneumoniamnist.npz")

In [5]:
train_images, train_labels = data["train_images"], data["train_labels"]

In [6]:
val_images, val_labels = data["val_images"], data["val_labels"]

In [7]:
test_images, test_labels = data["test_images"], data["test_labels"]

In [8]:
train_images.shape

(3882, 28, 28)

In [9]:
train_labels.shape

(3882, 1)

In [10]:
val_images.shape

(524, 28, 28)

In [11]:
val_labels.shape

(524, 1)

In [12]:
test_images.shape

(624, 28, 28)

In [13]:
test_labels.shape

(624, 1)

In [14]:
class ImageDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        # Convert image to torch tensor
        image = torch.tensor(image, dtype=torch.float32)
        label = torch.tensor(label, dtype=torch.float32)  # if using BCEWithLogitsLoss

        # Ensure image has shape [1, H, W]
        if image.ndim == 2:
            image = image.unsqueeze(0)
        elif image.ndim == 3 and image.shape[-1] in [1, 3]:
            image = image.permute(2, 0, 1)

        # Convert 1-channel grayscale to 3-channel RGB by repeating
        if image.shape[0] == 1:
            image = image.repeat(3, 1, 1)

        if self.transform:
            image = self.transform(image)

        return image, label


In [15]:
# transform = transforms.Compose([
#     transforms.Resize((299, 299)),         # Resize to match InceptionV3 input size
#     transforms.Normalize(mean=[0.5]*3, std=[0.5]*3)  # Normalize to [-1, 1] range (optional)
# ])

# transformations for the training datasets
transform = transforms.Compose([
    transforms.Resize((384,384)), # resize the image according to the input size of the model
    # transforms.Resize((224,224)),
    transforms.RandomHorizontalFlip(p=0.3), # flip the image horizontally with probality of 0.3
    transforms.RandomVerticalFlip(p=0.3), # flip the image vertically with probality of 0.3
    transforms.CenterCrop(384), # crop the image from the center
    transforms.RandomRotation(degrees=(-12, 12)),
    transforms.RandomApply([transforms.ColorJitter(brightness=0.35, contrast=0.35, saturation=0.35, hue=0.2)], p=0.3), # apply random color jitter with probality of 0.3
    transforms.RandomApply([transforms.RandomAffine(degrees=15, translate=(0.15, 0.15), scale=(0.85, 1.15))], p=0.4), # apply random affine transformation with probality of 0.4
    transforms.RandomApply(([transforms.GaussianBlur(kernel_size=3)]), p=0.3), # apply random gaussian blur with probality of 0.3 #Change the kernel size according to the model not max 5
    transforms.RandomErasing(p=0.3, scale=(0.02, 0.1), ratio=(0.3, 3.3)), # apply random erasing with probality of 0.3
    # transforms.ToTensor(), # # Converts image to tensor and rescales to [0.0, 1.0]
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # normalize the image

])

In [16]:
test_val_transforms = transforms.Compose([
    transforms.Resize((480,480)), # resize the image according to the input size of the model
    # transforms.Resize((224,224)),
    transforms.CenterCrop(480),
    # transforms.ToTensor(), # convert the image to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # normalize the image
])

In [17]:
train_dataset = ImageDataset(train_images, train_labels, transform=transform)
val_dataset = ImageDataset(val_images, val_labels, transform=test_val_transforms)
test_dataset = ImageDataset(test_images, test_labels, transform=test_val_transforms)

In [18]:
# Data loader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
# Validation loader
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

In [19]:
test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle = False, num_workers=4)

In [20]:
def train(model, train_loader, val_loader, criterion, optimizer, device, epoch, num_epochs, scheduler, threshold=0.5):
    model.train()
    running_loss = 0.0
    num_correct_preds = 0.0
    total_preds = 0.0
    train_preds = []
    train_labels = []

    with tqdm(total=len(train_loader), desc=f'Epoch {epoch+1}/{num_epochs}', unit='batch') as tepoch:
        for images, labels in train_loader:
            images = images.to(device)
            labels = labels.to(device).float().squeeze(1)  # ✅ shape: [B]

            optimizer.zero_grad()

            # InceptionV3 returns (main_output, aux_output)
            main_output, aux_output = model(images)
            main_output = main_output.squeeze(1)
            aux_output = aux_output.squeeze(1)

            loss_main = criterion(main_output, labels)
            loss_aux = criterion(aux_output, labels)
            loss = loss_main + 0.4 * loss_aux

            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            # Predictions
            probs = torch.sigmoid(main_output)
            preds = (probs > threshold).long()

            train_preds.extend(preds.detach().cpu().numpy())
            train_labels.extend(labels.detach().cpu().numpy())

            num_correct_preds += (preds == labels.long()).sum().item()
            total_preds += labels.size(0)

            tepoch.set_postfix(
                loss=running_loss / (tepoch.n + 1),
                accuracy=f"{(num_correct_preds / len(train_loader.dataset)) * 100:.2f}%"
            )
            tepoch.update(1)

    scheduler.step()

    # 🧮 Compute evaluation metrics
    precision = precision_score(train_labels, train_preds, average='weighted', zero_division=0)
    recall = recall_score(train_labels, train_preds, average='weighted', zero_division=0)
    f1 = f1_score(train_labels, train_preds, average='weighted', zero_division=0)
    train_accuracy = (num_correct_preds / total_preds) * 100

    print(f"\nTrain Metrics — Accuracy: {train_accuracy:.2f}% | Precision: {precision:.4f} | Recall: {recall:.4f} | F1: {f1:.4f}")

    return train_accuracy, running_loss / len(train_loader), f1, precision, recall


In [21]:
def validate(model, val_loader, criterion, device, threshold=0.5):
    model.eval()
    running_loss = 0.0
    num_correct_preds = 0.0
    total_preds = 0.0
    val_preds = []
    val_labels = []

    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            labels = labels.to(device).float()  # ✅ Ensure shape [B]

            outputs = model(images)  # shape: [batch_size]
            loss = criterion(outputs, labels)

            running_loss += loss.item()

            probs = torch.sigmoid(outputs)
            preds = (probs > threshold).long()

            val_preds.extend(preds.cpu().numpy())
            val_labels.extend(labels.cpu().numpy())

            num_correct_preds += (preds == labels.long()).sum().item()
            total_preds += labels.size(0)

    # 🧮 Compute metrics
    precision = precision_score(val_labels, val_preds, average='weighted', zero_division=0)
    recall = recall_score(val_labels, val_preds, average='weighted', zero_division=0)
    f1 = f1_score(val_labels, val_preds, average='weighted', zero_division=0)
    val_accuracy = (num_correct_preds / total_preds) * 100

    print(f"\nValidation Metrics — Accuracy: {val_accuracy:.2f}% | Precision: {precision:.4f} | Recall: {recall:.4f} | F1: {f1:.4f}")

    return val_accuracy, running_loss / len(val_loader), f1, precision, recall


In [22]:
def training_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device, scheduler, model_name, threshold=0.5):
    best_f1 = 0.0
    best_model = None

    for epoch in range(num_epochs):
        # 🔁 Train the model
        train_accuracy, train_loss, train_f1, train_precision, train_recall = train(
            model, train_loader, val_loader, criterion, optimizer, device, epoch, num_epochs, scheduler, threshold=threshold
        )

        # 🔍 Validate the model
        val_accuracy, val_loss, val_f1, val_precision, val_recall = validate(
            model, val_loader, criterion, device, threshold=threshold
        )

        # 💾 Save the best model based on validation F1
        if val_f1 > best_f1:
            best_f1 = val_f1
            best_model = copy.deepcopy(model.state_dict())
            torch.save(best_model, f'best_model_{model_name}_epoch{epoch+1}.pth')

            print(f'\n✅ Best model saved at epoch {epoch+1}/{num_epochs}')
            print(f'   🟢 Train — Acc: {train_accuracy:.2f}%, Loss: {train_loss:.4f}, F1: {train_f1:.4f}, P: {train_precision:.4f}, R: {train_recall:.4f}')
            print(f'   🔵 Val   — Acc: {val_accuracy:.2f}%, Loss: {val_loss:.4f}, F1: {val_f1:.4f}, P: {val_precision:.4f}, R: {val_recall:.4f}\n')

        torch.cuda.empty_cache()

    return model, best_model


In [23]:
# using the pre-trained model with importing the pretrained weight efficente_v2_m
# from torchvision.models import Inception_V3_Weights
model = models.inception_v3(weights=models.Inception_V3_Weights.DEFAULT)

Downloading: "https://download.pytorch.org/models/inception_v3_google-0cc3c7bd.pth" to /root/.cache/torch/hub/checkpoints/inception_v3_google-0cc3c7bd.pth
100%|██████████| 104M/104M [00:00<00:00, 199MB/s]  


In [24]:
# InceptionV3
model.fc = nn.Linear(model.fc.in_features, 1)
model.AuxLogits.fc = nn.Linear(model.AuxLogits.fc.in_features, 1)

In [25]:
# for param in model.parameters():
#     param.requires_grad = False  # Freeze all layers

# # Unfreeze classifier layers
# for param in model.fc.parameters():
#     param.requires_grad = True

# for param in model.AuxLogits.fc.parameters():
#     param.requires_grad = True


In [26]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = nn.DataParallel(model, device_ids=[0, 1])  # Use GPUs 0 and 1
model = model.to(device)

In [None]:
# num_epochs = 15
num_epochs = 3
criterion = nn.BCEWithLogitsLoss()
# optimizer = optim.NAdam(model.parameters(), lr=1e-4)
# use the optimizer wiht SGD, momentum  and nestrov momentum
# optimizer = optim.SGD(model.parameters(), lr=1e-4, momentum=0.9, nesterov=True)
optimizer = optim.AdamW(model.parameters(), lr=1e-4)

# scheduler for the learning rate
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs, eta_min=0.1e-6)
# scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)

last_epoch_model, best_model = training_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device, scheduler, 'inception_v3', threshold=0.5)


Epoch 1/3:  48%|████▊     | 58/122 [00:44<00:45,  1.40batch/s, accuracy=41.73%, loss=0.466]

In [None]:
model.load_state_dict(torch.load('/kaggle/working/best_model_inception_v3_epoch1.pth'))

In [None]:
val_accuracy, val_loss, val_f1 = validate(
            model, test_loader, criterion, device, threshold=0.5
        )

In [None]:
val_accuracy

In [None]:
val_loss