In [None]:
import os
import torch
import torchvision
from torchvision import transforms, models
from torchvision.datasets import ImageFolder
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, f1_score, roc_auc_score
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
import torchvision.models as models
from torchvision.utils import make_grid
import numpy as np
import itertools
import copy
import torch.nn.functional as F


In [None]:



os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"


data_dir = r"...\Train"
test_data_dir = r'...\Test'
classes = ('covid', 'normal')

In [None]:

data_transforms_train = transforms.Compose([
    transforms.Resize(160),
    transforms.CenterCrop(150),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.RandomRotation(degrees=180),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

data_transforms_val_test = transforms.Compose([
    transforms.Resize(160),
    transforms.CenterCrop(150),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_dataset = ImageFolder(data_dir, transform=data_transforms_train)
val_test_dataset = ImageFolder(test_data_dir, transform=data_transforms_val_test)


In [None]:

batch_size = 32
val_size = 50
train_size = len(train_dataset) - val_size
train_data, val_data = random_split(train_dataset, [train_size, val_size])

train_loader = DataLoader(train_data, batch_size=batch_size, num_workers=2, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, num_workers=2, shuffle=False)
test_loader = DataLoader(val_test_dataset, batch_size=batch_size, num_workers=2, shuffle=False)

In [None]:
def show_batch(dl):
    for images, labels in dl:
        fig, ax = plt.subplots(figsize=(16, 12))
        ax.set_xticks([])
        ax.set_yticks([])
        ax.imshow(make_grid(images, nrow=16).permute(1, 2, 0))
        break

In [None]:
def save_model(model, path="./cust_cnn.pth"):
    torch.save(model.state_dict(), path)


In [None]:

def evaluate(loader, model, device):
    model.eval()
    accuracy, total = 0.0, 0.0
    y_true, y_pred = [], []

    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)

            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())

            total += labels.size(0)
            accuracy += (predicted == labels).sum().item()

    accuracy = (100 * accuracy / total)
    cm = confusion_matrix(y_true, y_pred)
    # plot_confusion_matrix(cm, target_names=classes, normalize=False)
    precision = precision_score(y_true, y_pred, average="weighted")
    recall = recall_score(y_true, y_pred, average="weighted")
    f1 = f1_score(y_true, y_pred, average="weighted")

    print(f'Accuracy: {accuracy:.2f}%')
    print(f'Precision: {precision:.2f}')
    print(f'Recall: {recall:.2f}')
    print(f'F1 Score: {f1:.2f}')
    return accuracy, precision, recall, f1

In [None]:

class EarlyStopping:
    def __init__(self, patience=5, verbose=False):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.inf

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        save_model(model)
        self.val_loss_min = val_loss


In [None]:
def train(model, device, train_loader, val_loader, optimizer, loss_fn, num_epochs=20):
    best_accuracy = 0.0
    model.to(device)
    early_stopping = EarlyStopping(patience=5, verbose=True)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        avg_train_loss = running_loss / len(train_loader)
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_train_loss:.4f}')
        val_accuracy, _, _, _ = evaluate(val_loader, model, device)

        early_stopping(avg_train_loss, model)
        if early_stopping.early_stop:
            print("Early stopping")
            break


In [None]:
def plot_confusion_matrix(cm, target_names, title='Confusion matrix', cmap=None, normalize=True):
    accuracy = np.trace(cm) / np.sum(cm).astype('float')
    if cmap is None:
        cmap = plt.get_cmap('Blues')

    plt.figure(figsize=(4, 2))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title, fontsize=9)
    plt.colorbar()
    tick_marks = np.arange(len(target_names))
    plt.xticks(tick_marks, target_names, rotation=45)
    plt.yticks(tick_marks, target_names)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, f"{cm[i, j]:.4f}" if normalize else f"{cm[i, j]:,}",
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label', fontsize=18)
    plt.xlabel('Predicted label', fontsize=18)
    plt.show()

In [None]:
class Net(nn.Module):
    def __init__(self, num_classes=10):
        super(Net, self).__init__()
        
        pretrained_model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
        
        for param in pretrained_model.parameters():
            param.requires_grad = False
        
        self.conv1 = pretrained_model.conv1
        self.bn1 = pretrained_model.bn1
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = pretrained_model.maxpool
        self.layer1 = pretrained_model.layer1
        self.layer2 = pretrained_model.layer2
        self.custom_conv1 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.custom_bn1 = nn.BatchNorm2d(256)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(256 * 37 * 37, 512)
        self.fc2 = nn.Linear(512, num_classes)
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        
        x = self.custom_conv1(x)
        x = self.custom_bn1(x)
        x = self.relu(x)
        x = self.pool(x)
        
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [None]:

if __name__ == "__main__":
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("The model will be running on", device, "device")


    use_custom_net = True

    if use_custom_net:
        model = Net()
        optimizer = optim.Adam([
    {'params': model.conv1.parameters(), 'lr': 0.0001},
    {'params': model.layer1.parameters(), 'lr': 0.0001},
    {'params': model.layer2.parameters(), 'lr': 0.0001},
    {'params': model.custom_conv1.parameters(), 'lr': 0.001},
    {'params': model.fc1.parameters(), 'lr': 0.001},
    {'params': model.fc2.parameters(), 'lr': 0.001}
])

    else:
        
        model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
        for param in model.parameters():
            param.requires_grad = False
        model.fc = nn.Linear(model.fc.in_features, len(classes))
        optimizer = optim.Adam(model.fc.parameters(), lr=0.001)

    loss_fn = nn.CrossEntropyLoss()


    train(model, device, train_loader, val_loader, optimizer, loss_fn)
    print('Finished Training')
    train_status = True
    if train_status:
        model.load_state_dict(torch.load("cust_cnn.pth"))
        model.to(device)
        evaluate(test_loader, model, device)
