In [1]:
# What version of Python do you have?
import PIL

PIL.PILLOW_VERSION = PIL.__version__
import sys
import platform
import torch
import pandas as pd
import sklearn as sk
import torchmetrics
from Model import ImageDataset
from PIL import ImageFile

ImageFile.LOAD_TRUNCATED_IMAGES = True

has_gpu = torch.cuda.is_available()
has_mps = getattr(torch, 'has_mps', False)
device = "mps" if getattr(torch, 'has_mps', False) else "cuda" if torch.cuda.is_available() else "cpu"

print(f"Python Platform: {platform.platform()}")
print(f"PyTorch Version: {torch.__version__}")
print()
print(f"Python {sys.version}")
print(f"Pandas {pd.__version__}")
print(f"Scikit-Learn {sk.__version__}")
print("GPU is", "available" if has_gpu else "NOT AVAILABLE")
print("MPS (Apple Metal) is", "AVAILABLE" if has_mps else "NOT AVAILABLE")
print(f"Target device is {device}")

Python Platform: macOS-13.0.1-arm64-arm-64bit
PyTorch Version: 1.13.0

Python 3.8.15 (default, Nov 24 2022, 08:57:44) 
[Clang 14.0.6 ]
Pandas 1.5.1
Scikit-Learn 1.1.3
GPU is NOT AVAILABLE
MPS (Apple Metal) is AVAILABLE
Target device is mps


In [2]:
from torchvision.models import resnet50, resnet18
from torch import nn
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from torchinfo import summary
from tqdm.auto import tqdm
from torchvision.datasets import MNIST
import pytorch_lightning as pl
import random
import torch.backends.cudnn as cudnn
import numpy as np
from torch.nn import functional as F
from torchvision.models import ResNet50_Weights

torch.manual_seed(25)
np.random.seed(25)
random.seed(25)

cudnn.benchmark = False
cudnn.deterministic = True

In [3]:
from collections import Counter
# train_ds = torch.stack(train_ds, dim=0)
# test_ds = torch.stack(test_ds, dim=0)
from torchvision import datasets, transforms
from pathlib import Path
from sklearn.model_selection import train_test_split
from torch.utils.data.dataloader import default_collate

resize = 224
batch_size = 32
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

transform_train = transforms.Compose([
    transforms.RandomResizedCrop(resize, scale=(0.5, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

transform_test = transforms.Compose([
    transforms.Resize(resize),
    transforms.CenterCrop(resize),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

transform = transforms.Compose([
    transforms.Resize(resize),
    transforms.CenterCrop(resize),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

DATA_PATH = Path('../../ML/DataSet/train').resolve()
# DATA_PATH = Path('DataSet/COVID-19/CT').resolve()
DATA_PATH = list(DATA_PATH.glob('*/*'))

classes = [path.parent.stem for path in DATA_PATH]

dataset = pd.DataFrame({'path': DATA_PATH, 'class': classes})

dataset['class'] = dataset['class'].astype('category')
dataset['label'] = dataset['class'].cat.codes

class_labels = dict(zip(range(2), dataset['class'].cat.categories))

train_ds, test_ds = train_test_split(dataset, test_size=0.2, shuffle=True, random_state=42,
                                     stratify=dataset['label'])
train_dl = ImageDataset(train_ds.reset_index(drop=True), augmentations=transform_train)
test_dl = ImageDataset(test_ds.reset_index(drop=True),augmentations=transform_test)

In [4]:
dataset['class']

0        positive
1        positive
2        positive
3        positive
4        positive
           ...   
21577    negative
21578    negative
21579    negative
21580    negative
21581    negative
Name: class, Length: 21582, dtype: category
Categories (2, object): ['negative', 'positive']

In [5]:
train_accu = []
val_accu = []

train_loss = []
val_loss = []

In [6]:
class ResNet53(pl.LightningModule):
    def __init__(self):
        super(ResNet53, self).__init__()
        self.model = resnet50()
        self.model.avgpool = nn.Sequential(
            nn.Conv2d(2048, 256, kernel_size=(1, 1), bias=False),
            nn.BatchNorm2d(256),
            nn.Conv2d(256, 256, kernel_size=(3, 3), padding=(1, 1), bias=True, stride=(1, 1)),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d(output_size=(1, 1)),
        )
        self.model.fc = nn.Sequential(
            nn.Linear(in_features=256, out_features=2),
            nn.Sigmoid(),
            # nn.Linear(in_features=128, out_features=2),
        )
        self.loss = nn.CrossEntropyLoss()
        self.train_acc = torchmetrics.Accuracy()
        self.val_acc = torchmetrics.Accuracy()
        self.test_acc = torchmetrics.Accuracy()

    def forward(self, x):
        return self.model(x)

    def predict_step(self, batch, batch_idx, dataloader_idx=0):
        return self(batch)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-4)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2)
        return [optimizer], [scheduler]

    def training_step(self, batch, batch_no):
        x, y = batch
        preds = self(x)
        correct = preds.argmax(dim=1).eq(y).sum().item()
        total = len(y)
        loss = self.loss(preds, y)
        self.train_acc(torch.argmax(preds, dim=1), y)

        logs = {'train_loss': loss.item(), 'train_acc': self.train_acc}

        self.log("train_loss", loss.item(), on_step=True)
        self.log("train_acc", self.train_acc, on_step=True)

        train_loss.append(loss.item())
        train_accu.append(correct / total)

        batch_dictionary = {
            #REQUIRED: It ie required for us to return "loss"
            "loss": train_loss,

            #optional for batch logging purposes
            "log": logs,

            # info to be used at epoch end
            "correct": correct,
            "total": total
        }

        return batch_dictionary

    def validation_step(self, batch, batch_idx):
        x, y = batch

        preds = self.model(x)
        correct = preds.argmax(dim=1).eq(y).sum().item()
        total = len(y)

        loss = self.loss(preds, y)
        self.val_acc(torch.argmax(preds, dim=1), y)

        self.log('val_loss', loss.item(), on_step=True)
        self.log('val_acc', self.val_acc, on_step=True)

        val_loss.append(loss.item())
        val_accu.append(correct / total)

    def test_step(self, batch, batch_idx):
        x, y = batch
        preds = self.model(x)
        self.test_acc(torch.argmax(preds, dim=1), y)

        self.log('test_acc', self.test_acc, on_step=True)

In [7]:
model = ResNet53()
model.to(device)
summary(model, (batch_size, 3, resize, resize))

Layer (type:depth-idx)                        Output Shape              Param #
ResNet53                                      [32, 2]                   --
├─ResNet: 1-1                                 [32, 2]                   --
│    └─Conv2d: 2-1                            [32, 64, 112, 112]        9,408
│    └─BatchNorm2d: 2-2                       [32, 64, 112, 112]        128
│    └─ReLU: 2-3                              [32, 64, 112, 112]        --
│    └─MaxPool2d: 2-4                         [32, 64, 56, 56]          --
│    └─Sequential: 2-5                        [32, 256, 56, 56]         --
│    │    └─Bottleneck: 3-1                   [32, 256, 56, 56]         75,008
│    │    └─Bottleneck: 3-2                   [32, 256, 56, 56]         70,400
│    │    └─Bottleneck: 3-3                   [32, 256, 56, 56]         70,400
│    └─Sequential: 2-6                        [32, 512, 28, 28]         --
│    │    └─Bottleneck: 3-4                   [32, 512, 28, 28]         379,392

In [26]:
lr = 1e-4
loss_func = nn.CrossEntropyLoss()
epochs = 5
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
# optimizer = torch.optim.SGD(model.parameters(), lr=lr)

In [27]:
model.to(device)
for epoch in range(epochs):

    # training phase
    model.train()
    print(f"epoch {epoch} started")
    loop = tqdm(train_dl, leave=True)
    for batch in loop:
        x, y = batch
        optimizer.zero_grad()

        with torch.set_grad_enabled(True):
            # compute output of the model on the mini-batch
            x, y = x.to(device, non_blocking=True), y.to(device, non_blocking=True)

            # 1. Forward pass
            out = model(x)

            # 2. Calculate and accumulate loss
            loss = loss_func(out, y)

            # 3. Optimizer zero grad
            optimizer.zero_grad()

            # 4. Loss backward
            loss.backward()

            # 5. Optimizer step
            optimizer.step()

    # testing phase
    model.eval()
    total = len(test_ds)
    total_correct = 0  # count number of correct predictions on test set

    for x, y in test_dl:
        with torch.no_grad():  # prevent gradient computation
            x, y = x.to(device, non_blocking=True), y.to(device, non_blocking=True)
            out = model(x)
            # take the maximum output
            pred = torch.argmax(out, dim=1)
            # count the correct predictions
            corrects = (pred == y).sum().item()
            total_correct += corrects
        accuracy = total_correct / total
    print(f"epoch {epoch} accuracy {accuracy * 100:.2f}%")

In [28]:
# model = ResNet50()
# model.model

In [None]:
import matplotlib.ticker as ticker


def plot(train_accu, train_loss, val_accu, val_loss):
    plt.figure(figsize=(12, 12))
    # summarize history for accuracy

    plt.subplot(211)
    plt.plot(train_accu, linewidth=0.5)
    # plt.plot(val_accu, linewidth=0.5)
    plt.title('train_accuracy vs val_accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('iteration')
    # plt.legend(['Train', 'Validation'], loc='lower right')

    # summarize history for loss

    plt.subplot(212)
    plt.plot(train_loss, linewidth=0.5, color='orange')
    # plt.plot(val_loss, linewidth=0.5)
    plt.title('loss vs val_loss')
    plt.ylabel('loss')
    plt.xlabel('iteration')
    # plt.legend(['Train', 'Validation'], loc='upper right')
    plt.tight_layout()
    plt.show()

In [None]:
plot(train_accu, train_loss, val_accu, val_loss)
# m = ResNet53().load_from_checkpoint('lightning_logs/version_2/checkpoints/epoch=4-step=810.ckpt')
m = model
m.eval()
# test_dataset = CovidDataset(test_ds_ct, augmentations=transform_test)
# test_dataloader = DataLoader(test_dataset, batch_size=40, num_workers=2).dataset
# results = trainer.predict(model=m, dataloaders=test_dataloader)

In [None]:
# train(dataset_xray, train_dl_xray, test_dl_xray)
# result()

In [None]:
from sklearn.metrics import classification_report

In [None]:
def get_prediction(x, model: pl.LightningModule):
    model.freeze()  # prepares model for predicting
    probabilities = torch.softmax(model(x), dim=1)
    predicted_class = torch.argmax(probabilities, dim=1)
    return predicted_class, probabilities

In [None]:
true_y, pred_y = [], []
test_dataset = CovidDataset(test_ds, augmentations=transform_test)
test_dataloader = DataLoader(test_dataset, batch_size=40, num_workers=8)
for batch in tqdm(iter(test_dataloader), total=len(test_dataloader)):
    x, y = batch
    true_y.extend(y)
    preds, probs = get_prediction(x, m)
    pred_y.extend(preds.cpu())

In [None]:
print(classification_report(true_y, pred_y, digits=3))

In [None]:
# plot first n incorrect predictions
def plot_grid_label(ds, n, ypred):
    # n = len(ds)
    ncols = 6
    nrows = n // ncols + (1 if n % ncols > 0 else 0)
    plt.subplots(nrows, ncols, figsize=(20, 20))
    k = 0
    for i, (x, y) in enumerate(ds):
        if y != ypred[i]:
            label = class_labels[ds[i - 1][1].item()]
            plt.subplot(nrows, ncols, k + 1)
            plt.imshow(x[0], cmap="gray")
            plt.title(f'y={label} yp={"COVID" if ypred[i] == 0 else "Non-COVID"}')
            plt.tight_layout()

            k += 1
            if k >= n:
                break


five_from_each_ct = test_ds.groupby('class').apply(lambda s: s.sample(20)).reset_index(drop=True)
five_from_each_ds = CovidDataset(five_from_each_ct)

n_incorrect = sum([x != y for x, y in zip(true_y, pred_y)])
n_incorrect = n_incorrect.item()
plot_grid_label(five_from_each_ds, n_incorrect, pred_y)
print("n_incorrect", n_incorrect)