In [1]:
import os
import time
import copy
# import standard PyTorch modules
from collections import OrderedDict
import torch
from torch import nn
import numpy as np
import pandas as pd
from torch import optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models

In [2]:
#Getting your dataset
data_dir = 'C:/Users/Test/Desktop/covid/'
train_dir = data_dir + '/train'
valid_dir = data_dir + '/test'
#train_data = datasets.ImageFolder(data_dir +'train', transform=train_transforms)  
#test_data = datasets.ImageFolder(data_dir +'test', transform=test_transforms)


In [3]:
# Define your transforms for the training and testing sets
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomRotation(30),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], 
                             [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], 
                             [0.229, 0.224, 0.225])
    ])
}

# Load the datasets with ImageFolder
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'test']}

# Using the image datasets and the trainforms, define the dataloaders
batch_size = 32
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size,
                                             shuffle=True)
              for x in ['train', 'test']}

In [4]:
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'test']}
class_names = image_datasets['train'].classes
print(class_names)

['Athsma', 'COVID-19', 'NORMAL', 'Viral Pneumonia']


In [5]:
print(dataset_sizes)

{'train': 2172, 'test': 903}


In [6]:
# Run this to test the data loader
images, labels = next(iter(dataloaders['train']))
images.size()

torch.Size([32, 3, 224, 224])

In [7]:
rand_idx = np.random.randint(len(images))
print(rand_idx)
print("label: {}, class: {}".format(labels[rand_idx].item(),
                                               class_names[labels[rand_idx].item()]))


26
label: 3, class: Viral Pneumonia


In [8]:
#view dataset

In [9]:
model_name = 'densenet' #resnet
if model_name == 'densenet':
    model = models.densenet121(pretrained=True)
    num_in_features = 1024
    print(model)
elif model_name == 'resnet':
    model = models.resnet50(pretrained=True)
    num_in_features = 2408
    print(model.classifier)
else:
    print("Unknown model, please choose 'densenet' or 'resnet'")

DenseNet(
  (features): Sequential(
    (conv0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (norm0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu0): ReLU(inplace=True)
    (pool0): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (denseblock1): _DenseBlock(
      (denselayer1): _DenseLayer(
        (norm1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu1): ReLU(inplace=True)
        (conv1): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (norm2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu2): ReLU(inplace=True)
        (conv2): Conv2d(128, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      )
      (denselayer2): _DenseLayer(
        (norm1): BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu

In [10]:
#Solving class imbalance with weight loss
weight = np.array([2.5,0.8,0.3,0.18])
weight=torch.FloatTensor(weight)

In [11]:
# Create classifier
for param in model.parameters():
    param.requires_grad = False

def build_classifier(num_in_features, hidden_layers, num_out_features):
    """Deciding classifier and number of layers"""
    classifier = nn.Sequential()
    if hidden_layers == None:
        classifier.add_module('fc0', nn.Linear(num_in_features, 4))
    else:
        layer_sizes = zip(hidden_layers[:-1], hidden_layers[1:])
        classifier.add_module('fc0', nn.Linear(num_in_features, hidden_layers[0]))
        classifier.add_module('relu0', nn.ReLU())
        classifier.add_module('drop0', nn.Dropout(.6))
        classifier.add_module('relu1', nn.ReLU())
        classifier.add_module('drop1', nn.Dropout(.5))
        for i, (h1, h2) in enumerate(layer_sizes):
            classifier.add_module('fc'+str(i+1), nn.Linear(h1, h2))
            classifier.add_module('relu'+str(i+1), nn.ReLU())
            classifier.add_module('drop'+str(i+1), nn.Dropout(.5))
        classifier.add_module('output', nn.Linear(hidden_layers[-1], num_out_features))
        
    return classifier

In [12]:
hidden_layers = None#[4096, 1024, 256][512, 256, 128]

classifier = build_classifier(num_in_features, hidden_layers, 4)
print(classifier)

 # Only train the classifier parameters, feature parameters are frozen
if model_name == 'densenet':
    """Setting the fully connected layer, criterion, optimizer and scheduler"""
    model.classifier = classifier
    criterion = nn.CrossEntropyLoss(weight=weight)
    optimizer = optim.Adadelta(model.parameters()) # Adadelta #weight optim.Adam(model.parameters(), lr=0.001, momentum=0.9)
    #optimizer_conv = optim.SGD(model.parameters(), lr=0.0001, weight_decay=0.001, momentum=0.9)
    sched = optim.lr_scheduler.StepLR(optimizer, step_size=4)
elif model_name == 'resnet':
    model.fc = classifier
    criterion = nn.NLLLoss()
    optimizer = optim.Adam(model.classifier.parameters(), lr=0.0001)
    sched = lr_scheduler.StepLR(optimizer, step_size=4, gamma=0.1)
else:
    pass

Sequential(
  (fc0): Linear(in_features=1024, out_features=4, bias=True)
)


In [13]:
# Adapted from https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html
def train_model(model, criterion, optimizer, sched, num_epochs=5):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'test']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs
                labels = labels

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        #sched.step()
                        loss.backward()
                        
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'test' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    #load best model weights
    model.load_state_dict(best_model_wts)
    
    return model

In [14]:
epochs = 7
model = train_model(model, criterion, optimizer, sched, epochs)

Epoch 1/7
----------
train Loss: 0.8837 Acc: 0.6151
test Loss: 1.0100 Acc: 0.6146

Epoch 2/7
----------
train Loss: 0.5195 Acc: 0.7703
test Loss: 0.8692 Acc: 0.8317

Epoch 3/7
----------
train Loss: 0.4152 Acc: 0.8338
test Loss: 0.8920 Acc: 0.8383

Epoch 4/7
----------
train Loss: 0.3922 Acc: 0.8214
test Loss: 0.9010 Acc: 0.8383

Epoch 5/7
----------
train Loss: 0.3601 Acc: 0.8573
test Loss: 0.5962 Acc: 0.8594

Epoch 6/7
----------
train Loss: 0.3259 Acc: 0.8554
test Loss: 0.7780 Acc: 0.8372

Epoch 7/7
----------
train Loss: 0.3012 Acc: 0.8725
test Loss: 0.9340 Acc: 0.8494

Training complete in 147m 53s
Best val Acc: 0.859358


In [15]:
# Evaluation

model.eval()

accuracy = 0

for inputs, labels in dataloaders['test']:
    inputs, labels = inputs, labels
    outputs = model(inputs)
    
    # Class with the highest probability is our predicted class
    equality = (labels.data == outputs.max(1)[1])

    # Accuracy is number of correct predictions divided by all predictions
    accuracy += equality.type_as(torch.FloatTensor()).mean()
    
print("Test accuracy: {:.3f}".format(accuracy/len(dataloaders['test'])))

Test accuracy: 0.859


In [23]:
model.class_to_idx = image_datasets['train'].class_to_idx

#Model checkpoints
checkpoint = {'input_size': 1024,
              'output_size':4,
              'epochs': epochs,
              'batch_size': 16,
              'model': models.densenet121(pretrained = True),
              'scheduler': sched,
              'classifier': model.classifier,
              'optimizer': optimizer.state_dict(),
              'state_dict': model.state_dict(),
              'class_to_idx': model.class_to_idx
             }

In [24]:
#Saving the model
torch.save(checkpoint, 'checkpoint.pth')

### Making prediction to test the model performance

In [42]:
#Loading the Image
from PIL import Image
image =Image.open('C:/Users/Test/Desktop/COVID/test/COVID-19/COVID-19(198).png').convert('RGB')

#Applying transforms on the Image
preprocess = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                             std=[0.229, 0.224, 0.225])
    ])

image= preprocess(image).unsqueeze(0)

In [43]:
model.eval()
classes = ('Athsma','covid-19', 'normal', 'viral pneumonial')
outputs = model(image)
_, prediction = outputs.max(1)
category = prediction.item()
name = classes[category]
print(name)


covid-19
