In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
from PIL import Image, ImageOps
from sklearn.metrics import f1_score, recall_score
import numpy as np
from tqdm import tqdm

In [2]:
num_diseases = 485
root_dir = '/media/abdullah/Abdullah/Learning/medImageAnalysis/folder_that_is_Augmented_version_small_testing_dataset'

In [3]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using: {device}')

Using: cuda


In [4]:
class ResizeWithPanAndCenter(object):
    def __init__(self, size, padding_value=0):
        self.size = size
        self.padding_value = padding_value
    
    def __call__(self, img):
        delta_w = max(0, self.size[0]-img.size[0])
        delta_h = max(0, self.size[1] - img.size[1])
        padding = (delta_w // 2, delta_h // 2, (delta_w // 2), delta_h - (delta_h // 2))
        img = img.resize(self.size, Image.Resampling.BILINEAR)

        return img
    

In [5]:
mean = [0.57802826, 0.29917458, 0.26115456]
std = [0.18442076, 0.28176323, 0.25507942]

In [6]:
transform = transforms.Compose([
    ResizeWithPanAndCenter((224, 224), padding_value=128),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std),
])


In [7]:
dataset = datasets.ImageFolder(root=root_dir, transform=transform)

In [8]:
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

In [9]:
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])


In [10]:
train_dataloader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=4)
test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=4)

In [11]:
class DenseNet201(nn.Module):
    def __init__(self,num_classes=num_diseases):
        super(DenseNet201, self).__init__()

        self.densenet201 = models.densenet201(weights=models.DenseNet201_Weights.IMAGENET1K_V1)

        num_ftrs = self.densenet201.classifier.in_features
        self.densenet201.classifier = nn.Linear(num_ftrs, num_classes)

        for params in self.densenet201.parameters():
            params.requires_grad = False
        for params in self.densenet201.classifier.parameters():
            params.requires_grad = True
    
    def forward(self, x):
        x = self.densenet201(x)
        return x
    
    


In [12]:
model = DenseNet201()

In [13]:
model.to(device)

DenseNet201(
  (densenet201): DenseNet(
    (features): Sequential(
      (conv0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (norm0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu0): ReLU(inplace=True)
      (pool0): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (denseblock1): _DenseBlock(
        (denselayer1): _DenseLayer(
          (norm1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (relu1): ReLU(inplace=True)
          (conv1): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (norm2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (relu2): ReLU(inplace=True)
          (conv2): Conv2d(128, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        )
        (denselayer2): _DenseLayer(
          (norm1): BatchNorm2d(96, eps=1e-05, mome

In [15]:
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)

In [16]:
loss_fn = nn.CrossEntropyLoss()

In [17]:
def check_dataloader(dataloader):
    for batch, (X, y) in enumerate(dataloader):
        for i in range(X.size(0)):
            img = X[i]
            print(f"Image {i} in batch {batch}: size {img.size()}, dtype {img.dtype}")
            if img.size() != (3, 224, 224):
                print(f"Unexpected image size: {img.size()} in batch {batch} index {i}")
            if not torch.is_floating_point(img):
                print(f"Unexpected image type: {img.dtype} in batch {batch} index {i}")

In [18]:
def train(dataloader, model, loss_fn, optimizer):
    model.train()
    for batch, (X, y) in tqdm(enumerate(dataloader)):
        X, y = X.to(device), y.to(device)
        
        # Forward pass
        pred = model(X)
        loss = loss_fn(pred, y)
        
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{len(dataloader.dataset):>5d}]")


In [19]:
def test(dataloader, model, loss_fn):
    model.eval()
    test_loss, correct = 0, 0
    all_preds, all_labels = [], []
    
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
            all_preds.extend(pred.cpu().numpy())
            all_labels.extend(y.cpu().numpy())

    test_loss /= len(dataloader)
    accuracy = correct / len(dataloader.dataset)
    
    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)
    top1_accuracy = np.mean(np.any(all_preds.argsort(axis=1)[:, -1:] == all_labels[:, None], axis=1))
    top2_accuracy = np.mean(np.any(all_preds.argsort(axis=1)[:, -2:] == all_labels[:, None], axis=1))
    top5_accuracy = np.mean(np.any(all_preds.argsort(axis=1)[:, -5:] == all_labels[:, None], axis=1))
    
    print(f"Test Error: \n Accuracy: {(100*accuracy):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    print(f"Top-1 Accuracy: {top1_accuracy:>0.1f}")
    print(f"Top-2 Accuracy: {top2_accuracy:>0.1f}")
    print(f"Top-5 Accuracy: {top5_accuracy:>0.1f}")

In [20]:
import pandas as pd

# Create an empty list to store the average loss after each epoch
avg_losses = []

# Training loop
epochs = 1
for epoch in range(epochs):
    print(f"Epoch {epoch+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    avg_loss = test(test_dataloader, model, loss_fn)
    avg_losses.append(avg_loss)

# Create a table using pandas DataFrame to display the average loss after each epoch
table = pd.DataFrame({'Epoch': range(1, epochs+1), 'Average Loss': avg_losses})
print(table)


Epoch 1
-------------------------------


2it [00:02,  1.09s/it]

loss: 6.190208  [    0/52569]


102it [01:10,  1.57it/s]

loss: 5.648426  [12800/52569]


201it [02:18,  1.16it/s]

loss: 5.411500  [25600/52569]


301it [03:25,  1.16it/s]

loss: 5.197964  [38400/52569]


401it [04:32,  1.16it/s]

loss: 5.378099  [51200/52569]


411it [05:17,  1.29it/s]


Test Error: 
 Accuracy: 7.1%, Avg loss: 5.202076 

Top-1 Accuracy: 0.1
Top-2 Accuracy: 0.1
Top-5 Accuracy: 0.2
   Epoch Average Loss
0      1         None
