In [None]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from torch.optim.lr_scheduler import StepLR
from torchvision import transforms, models
from tqdm.auto import tqdm
from PIL import Image
from sklearn.metrics import classification_report, balanced_accuracy_score

In [6]:
# loading the chestmnist dataset
data_path = "datasets"
data = np.load(f"{data_path}/pathmnist_128.npz")

In [7]:
print(data.keys())

KeysView(NpzFile 'datasets/pathmnist_128.npz' with keys: train_images, train_labels, val_images, val_labels, test_images...)


In [8]:
train_images = data['train_images']
train_labels = data['train_labels']

validation_images = data['val_images']
validation_labels = data['val_labels']

test_images = data['test_images']
test_labels = data['test_labels']

In [9]:
print(f"The shape of train images: {train_images.shape}, train labels: {train_labels.shape}")
print(f"The shape of validation images: {validation_images.shape}, validation labels: {validation_labels.shape}")
print(f"The shape of test images: {test_images.shape}, test labels: {test_labels.shape}")

The shape of train images: (89996, 128, 128, 3), train labels: (89996, 1)
The shape of validation images: (10004, 128, 128, 3), validation labels: (10004, 1)
The shape of test images: (7180, 128, 128, 3), test labels: (7180, 1)


In [10]:
import numpy as np

# Count labels per class
unique, counts = np.unique(train_labels, return_counts=True)
print(f"Train - Labels per class: {dict(zip(unique, counts))}")
print(f"Total train samples: {len(train_labels)}")

unique, counts = np.unique(validation_labels, return_counts=True)
print(f"\nVal - Labels per class: {dict(zip(unique, counts))}")
print(f"Total val samples: {len(validation_labels)}")

unique, counts = np.unique(test_labels, return_counts=True)
print(f"\nTest - Labels per class: {dict(zip(unique, counts))}")
print(f"Total test samples: {len(test_labels)}")

# Combined
all_labels = np.concatenate([train_labels, validation_labels, test_labels])
unique, counts = np.unique(all_labels, return_counts=True)
print(f"\nAll - Labels per class: {dict(zip(unique, counts))}")
print(f"Total samples: {len(all_labels)}")

Train - Labels per class: {np.uint8(0): np.int64(9366), np.uint8(1): np.int64(9509), np.uint8(2): np.int64(10360), np.uint8(3): np.int64(10401), np.uint8(4): np.int64(8006), np.uint8(5): np.int64(12182), np.uint8(6): np.int64(7886), np.uint8(7): np.int64(9401), np.uint8(8): np.int64(12885)}
Total train samples: 89996

Val - Labels per class: {np.uint8(0): np.int64(1041), np.uint8(1): np.int64(1057), np.uint8(2): np.int64(1152), np.uint8(3): np.int64(1156), np.uint8(4): np.int64(890), np.uint8(5): np.int64(1354), np.uint8(6): np.int64(877), np.uint8(7): np.int64(1045), np.uint8(8): np.int64(1432)}
Total val samples: 10004

Test - Labels per class: {np.uint8(0): np.int64(1338), np.uint8(1): np.int64(847), np.uint8(2): np.int64(339), np.uint8(3): np.int64(634), np.uint8(4): np.int64(1035), np.uint8(5): np.int64(592), np.uint8(6): np.int64(741), np.uint8(7): np.int64(421), np.uint8(8): np.int64(1233)}
Total test samples: 7180

All - Labels per class: {np.uint8(0): np.int64(11745), np.uint8

In [11]:
import torch
from torch.utils.data import WeightedRandomSampler

# Calculate class weights (inverse frequency)
unique, counts = np.unique(train_labels, return_counts=True)
class_weights = 1.0 / counts
class_weights = class_weights / class_weights.sum() * len(class_weights)  # normalize
class_weights = torch.FloatTensor(class_weights)

print(f"Class weights: {class_weights}")

Class weights: tensor([1.0418, 1.0261, 0.9418, 0.9381, 1.2187, 0.8010, 1.2373, 1.0379, 0.7573])


In [13]:
# Get weight for each sample based on its label
sample_weights = class_weights[train_labels.flatten().astype(int)]

sampler = WeightedRandomSampler(
    weights=sample_weights,
    num_samples=len(sample_weights),
    replacement=True
)

# train_loader = DataLoader(train_dataset, batch_size=32, sampler=sampler)

In [15]:
class ColonDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform
    
    def __len__(self): 
        return len(self.images) # number of samples
    
    def __getitem__(self, idx):
        img = self.images[idx]

        if isinstance(img, np.ndarray):
            img = Image.fromarray(img.astype('uint8'))

        if self.transform: # apply transformations
            img = self.transform(img)

        label = self.labels[idx]
        label = torch.tensor(label, dtype=torch.float)
        
        return img, label

# Calculating the Mean and STD for the Dataset

In [16]:
def calculate_mean_std(dataset):

    loader = DataLoader(dataset, batch_size=64, shuffle=False, num_workers=0)

    channel_sum = torch.zeros(3)
    channel_sum_sq = torch.zeros(3)
    num_pixels = 0

    # Wrap the loader with tqdm to create a progress bar for monitoring.
    for images, _ in tqdm(loader, desc="Calculating Dataset Stats"):
        # Add the total number of pixels in this batch to the running total.
        num_pixels += images.size(0) * images.size(2) * images.size(3)
        
        # Sum the pixel values across the batch, height, and width dimensions,
        # leaving only the channel dimension. Add this to the running total.
        channel_sum += images.sum(dim=[0, 2, 3])
        
        # Square each pixel value, then sum them up similarly to the step above.
        channel_sum_sq += (images ** 2).sum(dim=[0, 2, 3])

    # Calculate the mean for each channel.
    mean = channel_sum / num_pixels
    # Calculate the standard deviation using the formula: sqrt(E[X^2] - E[X]^2)
    std = (channel_sum_sq / num_pixels - mean ** 2) ** 0.5

    # Return the calculated mean and standard deviation.
    return mean, std

In [None]:
temp_simple_transform = transforms.Compose([
    transforms.Resize((28, 28)),
    transforms.ToTensor()
])

temp_dataset = ColonDataset(train_images, train_labels, transform=temp_simple_transform)

dataset_mean, dataset_std = calculate_mean_std(temp_dataset)


print(f"Dataset Mean: {dataset_mean}")
print(f"Dataset Std:  {dataset_std}")

Calculating Dataset Stats:   0%|          | 0/1407 [00:00<?, ?it/s]

Dataset Mean: tensor([0.7405, 0.5330, 0.7058])
Dataset Std:  tensor([0.1237, 0.1768, 0.1244])


In [17]:
from torchvision import transforms # to apply transformations to the images

transforms_with_augmentation = transforms.Compose([
    transforms.ToTensor(), # convert to tensor
    transforms.RandomRotation(degrees=15), # Randomly rotates the image between -10 and +10 degrees
    transforms.Normalize((0.7405, 0.5330, 0.7058), (0.1237, 0.1768, 0.1244))
])

transforms = transforms.Compose([
    transforms.ToTensor(), # convert to tensor
    transforms.Normalize((0.7405, 0.5330, 0.7058), (0.1237, 0.1768, 0.1244))
])

In [18]:
train_dataset = ColonDataset(train_images, train_labels, transform=transforms_with_augmentation)
validation_dataset = ColonDataset(validation_images, validation_labels, transform=transforms)
test_dataset = ColonDataset(test_images, test_labels, transform=transforms)

In [19]:
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=1000, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)

In [20]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [21]:
model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
model.fc = nn.Sequential(
    nn.Dropout(0.1),
    nn.Linear(512, 9)
)
model = model.to(device)

In [23]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
scheduler = StepLR(optimizer, step_size=5, gamma=0.8)

In [24]:
import time
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_auc_score, f1_score

train_f1s = []
val_f1s = []

train_losses = []
train_aucs = []
train_accs = []
val_losses = []
val_aucs = []
val_accs = []
train_epoch_times = []
val_f1_micros = []

import time
from sklearn.metrics import roc_auc_score

epoch = 1