In [None]:
import time
import torch
from torch import nn, optim
import torchvision
import sys
import numpy as np
from torchvision import models
import torch.nn.init as init
from typing import Any
from collections import OrderedDict

# AlexNet

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.manual_seed(25)

class AlexNet(nn.Module):
    def __init__(self):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, 1000),
        )
        
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

In [None]:
alexnet = AlexNet().to(device)
alexnet

In [None]:
alexnet.classifier[1] = nn.Linear(9216,4096)
alexnet.classifier[4] = nn.Linear(4096,1024)
alexnet.classifier[6] = nn.Linear(1024,2)
alexnet

In [None]:
import torch
from torchvision.datasets import ImageFolder
from torchvision import transforms
import cv2
import os

def getStat(train_data):
    print('Compute mean and std for training data.')
    print(len(train_data))
    train_loader = torch.utils.data.DataLoader(
        train_data, batch_size=1, shuffle=False, num_workers=0,
        pin_memory=True)
    mean = torch.zeros(3)
    std = torch.zeros(3)

    for X, Y in train_loader:
        for d in range(3):
            mean[d] += X[:, d, :, :].mean()
            std[d] += X[:, d, :, :].std()
    mean.div_(len(train_data))
    std.div_(len(train_data))
    return list(mean.numpy()), list(std.numpy())

if __name__ == '__main__':
    train_dataset = ImageFolder(root=os.getcwd()+ "/downloads/COVID-19-1/Train/", transform=transforms.ToTensor())  #get_transform_for_train()
    print(getStat(train_dataset))

In [None]:
from PIL import Image
from torchvision import transforms

preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.3313017, 0.3313017, 0.3313017], std=[0.340617, 0.340617, 0.340617]),
])

In [None]:
from torchvision import datasets
import os, os.path

data_root = os.getcwd()
image_path = data_root + "/downloads/COVID-19-1/Train/"
train_dataset = datasets.ImageFolder(root = image_path, transform = preprocess)
lung_list = train_dataset.class_to_idx

train_num = len(train_dataset)
batch_size = 4
train_iter = torch.utils.data.DataLoader(train_dataset, batch_size = batch_size, shuffle = True, num_workers = 2, drop_last = True)

val_dataset = datasets.ImageFolder(root = data_root + "/downloads/COVID-19-1/Test/", transform = preprocess)
val_iter = torch.utils.data.DataLoader(val_dataset, batch_size = batch_size, shuffle = True, num_workers = 2, drop_last = True)

classes = ("Normal", "Abnormal")

In [None]:
import time
from matplotlib import pyplot as plt

lr = 0.001

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(alexnet.parameters(), lr=lr, momentum=0.9)

acc_t = []
acc_v = []
epoch_counts = []

for epoch in range(50):  # loop over the dataset multiple times
    
    running_loss = 0.0
    start_time = time.time()
    for i, data in enumerate(train_iter, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data[0].to(device), data[1].to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        output = alexnet(inputs)
        loss = loss_fn(output, labels)
        loss.backward()
        optimizer.step()

        #Time
        end_time = time.time()
        time_taken = end_time - start_time

        # print statistics
        running_loss += loss.item()
        if i % 625 == 624:
            print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 625))
            print('Time:',time_taken)
            running_loss = 0.0
    
    #Testing Accuracy
    correct_t = 0
    total_t = 0
    
    with torch.no_grad():
        for data in train_iter:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = alexnet(images)
            _, predicted = torch.max(outputs.data, 1)
            total_t += labels.size(0)
            correct_t += (predicted == labels).sum().item()
    acc_t.append(correct_t / total_t)
            
    correct_v = 0
    total_v = 0
    
    with torch.no_grad():
        for data in val_iter:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = alexnet(images)
            _, predicted = torch.max(outputs.data, 1)
            total_v += labels.size(0)
            correct_v += (predicted == labels).sum().item()
    acc_v.append(correct_v / total_v)
    
    epoch_counts.append(epoch + 1)
            
    print('Epoch', epoch + 1,': Train accuracy: %.2f %%' % (100 * correct_t / total_t), ', Test accuracy: %.2f %%' % (100 * correct_v / total_v))
    
print('Finished Training of AlexNet')

plt.plot(epoch_counts, acc_t)
plt.plot(epoch_counts, acc_v)
plt.title('AlexNet accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show();

In [None]:
class_correct = list(0. for i in range(10))
class_total = list(0. for i in range(10))
class_FP_TP = list(0. for i in range(10))
class_TN_FN = list(0. for i in range(10))
with torch.no_grad():
    for data in val_iter:
        images, labels = data[0].to(device), data[1].to(device)
        outputs = alexnet(images)
        _, predicted = torch.max(outputs, 1)
        c0 = (predicted == 0).squeeze()
        c1 = (predicted == 1).squeeze()
        c = (predicted == labels).squeeze()
        for i in range(4):
            label = labels[i]
            class_FP_TP[label] += c1[i].item()
            class_TN_FN[label] += c0[i].item()
            class_correct[label] += c[i].item()
            class_total[label] += 1

Precision0 = 100 * class_TN_FN[0] / (class_TN_FN[0] + class_TN_FN[1])
Recall0 = 100 * class_TN_FN[0]/ (class_TN_FN[0] + class_FP_TP[0])
F10 = 2 * (Precision0 * Recall0) / (Precision0 + Recall0)
print('Numbers of FP for', classes[0], ':', int(class_TN_FN[1]))
print('Numbers of FN for', classes[0], ':', int(class_FP_TP[0]))
print('Precision of %5s : %2d %%' % (classes[0], Precision0))
print('Recall of %5s : %2d %%' % (classes[0], Recall0))
print('Accuracy of %5s : %2d %%' % (classes[0], 100 * class_correct[0] / class_total[0]))
print('F1-score of %5s : %2d %%' % (classes[0], F10))
print()

Precision1 = 100 * class_FP_TP[1] / (class_FP_TP[1] + class_FP_TP[0])
Recall1 = 100 * class_FP_TP[1] / (class_FP_TP[1] + class_TN_FN[1])
F11 = 2 * (Precision1 * Recall1) / (Precision1 + Recall1)
print('Numbers of FP for', classes[1], ':', int(class_FP_TP[0]))
print('Numbers of FN for', classes[1], ':', int(class_TN_FN[1]))
print('Precision of %5s : %2d %%' % (classes[1], Precision1))
print('Recall of %5s : %2d %%' % (classes[1], Recall1))
print('Accuracy of %5s : %2d %%' % (classes[1], 100 * class_correct[1] / class_total[1]))
print('F1-score of %5s : %2d %%' % (classes[1], F11))

# SqueezeNet

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.manual_seed(25)

class Fire(nn.Module):

    def __init__(
        self,
        inplanes: int,
        squeeze_planes: int,
        expand1x1_planes: int,
        expand3x3_planes: int
    ) -> None:
        super(Fire, self).__init__()
        self.inplanes = inplanes
        self.squeeze = nn.Conv2d(inplanes, squeeze_planes, kernel_size=1)
        self.squeeze_activation = nn.ReLU(inplace=True)
        self.expand1x1 = nn.Conv2d(squeeze_planes, expand1x1_planes,
                                   kernel_size=1)
        self.expand1x1_activation = nn.ReLU(inplace=True)
        self.expand3x3 = nn.Conv2d(squeeze_planes, expand3x3_planes,
                                   kernel_size=3, padding=1)
        self.expand3x3_activation = nn.ReLU(inplace=True)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.squeeze_activation(self.squeeze(x))
        return torch.cat([
            self.expand1x1_activation(self.expand1x1(x)),
            self.expand3x3_activation(self.expand3x3(x))
        ], 1)

class SqueezeNet(nn.Module):

    def __init__(
        self,
        version: str = '1_0',
        num_classes: int = 2
    ) -> None:
        super(SqueezeNet, self).__init__()
        self.num_classes = num_classes
        if version == '1_0':
            self.features = nn.Sequential(
                nn.Conv2d(3, 96, kernel_size=7, stride=2),
                nn.ReLU(inplace=True),
                nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
                Fire(96, 16, 64, 64),
                Fire(128, 16, 64, 64),
                Fire(128, 32, 128, 128),
                nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
                Fire(256, 32, 128, 128),
                Fire(256, 48, 192, 192),
                Fire(384, 48, 192, 192),
                Fire(384, 64, 256, 256),
                nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
                Fire(512, 64, 256, 256),
            )
        elif version == '1_1':
            self.features = nn.Sequential(
                nn.Conv2d(3, 64, kernel_size=3, stride=2),
                nn.ReLU(inplace=True),
                nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
                Fire(64, 16, 64, 64),
                Fire(128, 16, 64, 64),
                nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
                Fire(128, 32, 128, 128),
                Fire(256, 32, 128, 128),
                nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
                Fire(256, 48, 192, 192),
                Fire(384, 48, 192, 192),
                Fire(384, 64, 256, 256),
                Fire(512, 64, 256, 256),
            )
        else:
            # FIXME: Is this needed? SqueezeNet should only be called from the
            # FIXME: squeezenet1_x() functions
            # FIXME: This checking is not done for the other models
            raise ValueError("Unsupported SqueezeNet version {version}:"
                             "1_0 or 1_1 expected".format(version=version))

        # Final convolution is initialized differently from the rest
        final_conv = nn.Conv2d(512, self.num_classes, kernel_size=1)
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.5),
            final_conv,
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d((1, 1))
        )

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                if m is final_conv:
                    init.normal_(m.weight, mean=0.0, std=0.01)
                else:
                    init.kaiming_uniform_(m.weight)
                if m.bias is not None:
                    init.constant_(m.bias, 0)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.features(x)
        x = self.classifier(x)
        return torch.flatten(x, 1)
    
def _squeezenet(version: str, pretrained: bool, progress: bool, **kwargs: Any) -> SqueezeNet:
    model = SqueezeNet(version, **kwargs)
    if pretrained:
        arch = 'squeezenet' + version
        state_dict = load_state_dict_from_url(model_urls[arch],
                                              progress=progress)
        model.load_state_dict(state_dict)
    return model

In [None]:
squeezenet = SqueezeNet()
squeezenet

In [None]:
squeezenet.classifier[1] = nn.Conv2d(512, 2, kernel_size=(1,1), stride=(1,1))
squeezenet

In [None]:
from torchvision import datasets
import os, os.path

data_root = os.getcwd()
image_path = data_root + "/downloads/COVID-19-1/Train/"
train_dataset = datasets.ImageFolder(root = image_path, transform = preprocess)
lung_list = train_dataset.class_to_idx

train_num = len(train_dataset)
batch_size = 16
train_iter = torch.utils.data.DataLoader(train_dataset, batch_size = batch_size, shuffle = True, num_workers = 2, drop_last = True)

val_dataset = datasets.ImageFolder(root = data_root + "/downloads/COVID-19-1/Test/", transform = preprocess)
val_iter = torch.utils.data.DataLoader(val_dataset, batch_size = batch_size, shuffle = True, num_workers = 2, drop_last = True)

classes = ("Normal", "Abnormal")

In [None]:
import time
from matplotlib import pyplot as plt

lr = 0.001

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(squeezenet.parameters(), lr=lr, momentum=0.9)

acc_t = []
acc_v = []
epoch_counts = []

for epoch in range(50):  # loop over the dataset multiple times
    
    running_loss = 0.0
    start_time = time.time()
    for i, data in enumerate(train_iter, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data[0].to(device), data[1].to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        output = squeezenet(inputs)
        loss = loss_fn(output, labels)
        loss.backward()
        optimizer.step()

        #Time
        end_time = time.time()
        time_taken = end_time - start_time

        # print statistics
        running_loss += loss.item()
        if i % 300 == 299:
            print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 5000))
            print('Time:',time_taken)
            running_loss = 0.0
    
    #Testing Accuracy
    correct_t = 0
    total_t = 0
    
    with torch.no_grad():
        for data in train_iter:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = squeezenet(images)
            _, predicted = torch.max(outputs.data, 1)
            total_t += labels.size(0)
            correct_t += (predicted == labels).sum().item()
    acc_t.append(correct_t / total_t)
            
    correct_v = 0
    total_v = 0
    
    with torch.no_grad():
        for data in val_iter:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = squeezenet(images)
            _, predicted = torch.max(outputs.data, 1)
            total_v += labels.size(0)
            correct_v += (predicted == labels).sum().item()
    acc_v.append(correct_v / total_v)
    
    epoch_counts.append(epoch + 1)
            
    print('Epoch', epoch + 1,': Train accuracy: %.2f %%' % (100 * correct_t / total_t), ', Test accuracy: %.2f %%' % (100 * correct_v / total_v))
    
print('Finished Training of SqueezeNet')

plt.plot(epoch_counts, acc_t)
plt.plot(epoch_counts, acc_v)
plt.title('SqueezeNet accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show();

In [None]:
class_correct = list(0. for i in range(10))
class_total = list(0. for i in range(10))
class_FP_TP = list(0. for i in range(10))
class_TN_FN = list(0. for i in range(10))
with torch.no_grad():
    for data in val_iter:
        images, labels = data[0].to(device), data[1].to(device)
        outputs = squeezenet(images)
        _, predicted = torch.max(outputs, 1)
        c0 = (predicted == 0).squeeze()
        c1 = (predicted == 1).squeeze()
        c = (predicted == labels).squeeze()
        for i in range(4):
            label = labels[i]
            class_FP_TP[label] += c1[i].item()
            class_TN_FN[label] += c0[i].item()
            class_correct[label] += c[i].item()
            class_total[label] += 1

Precision0 = 100 * class_TN_FN[0] / (class_TN_FN[0] + class_TN_FN[1])
Recall0 = 100 * class_TN_FN[0]/ (class_TN_FN[0] + class_FP_TP[0])
F10 = 2 * (Precision0 * Recall0) / (Precision0 + Recall0)
print('Numbers of FP for', classes[0], ':', int(class_TN_FN[1]))
print('Numbers of FN for', classes[0], ':', int(class_FP_TP[0]))
print('Precision of %5s : %2d %%' % (classes[0], Precision0))
print('Recall of %5s : %2d %%' % (classes[0], Recall0))
print('Accuracy of %5s : %2d %%' % (classes[0], 100 * class_correct[0] / class_total[0]))
print('F1-score of %5s : %2d %%' % (classes[0], F10))
print()

Precision1 = 100 * class_FP_TP[1] / (class_FP_TP[1] + class_FP_TP[0])
Recall1 = 100 * class_FP_TP[1] / (class_FP_TP[1] + class_TN_FN[1])
F11 = 2 * (Precision1 * Recall1) / (Precision1 + Recall1)
print('Numbers of FP for', classes[1], ':', int(class_FP_TP[0]))
print('Numbers of FN for', classes[1], ':', int(class_TN_FN[1]))
print('Precision of %5s : %2d %%' % (classes[1], Precision1))
print('Recall of %5s : %2d %%' % (classes[1], Recall1))
print('Accuracy of %5s : %2d %%' % (classes[1], 100 * class_correct[1] / class_total[1]))
print('F1-score of %5s : %2d %%' % (classes[1], F11))