In [2]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import cv2 #image processing library
import glob #finding files

#Pytorch Machine Learning Library
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader, random_split

In [14]:
TRAIN_PATH = "../input/siim-covid19-dataset-256px-jpg/256px/train/train/"
TEST_PATH = "../input/siim-covid19-dataset-256px-jpg/256px/test/test/"
one_hot = ['Atypical Appearance', 'Indeterminate Appearance', 'Negative for Pneumonia', 'Typical Appearance']

train = pd.read_csv("../input/siim-covid19-dataset-256px-jpg/train.csv")

# Saving and Preprocessing COVID-19 Data

In [86]:
#Creating Labels from Data Frame
train_labels = pd.get_dummies(train.label_id).values

#Creating array of images
train_imgs = None
for img_id, study_id in train[['ImageInstanceUID', 'StudyInstanceUID']].values:
    filepath = glob.glob(TRAIN_PATH + study_id + "*" + img_id + ".jpg")[0]
    if train_imgs is not None:
        train_imgs = np.concatenate((train_imgs, np.array([cv2.imread(filepath)])))
    else:
        train_imgs = np.array([cv2.imread(filepath)])

np.save('training_labels', train_labels)
np.save('training_images', train_imgs)

# Loading Data Since Preprocessing Images is Time Consuming

In [15]:
train_labels = np.load('training_labels.npy')
train_imgs = np.load('training_images.npy')
print(train_labels.shape)
print(train_imgs.shape)

train_dataset = TensorDataset(Tensor(train_images), Tensor(train_labels))
train_set, val_set = random_split(train_dataset, [5334, 1000])
train_loader = DataLoader(train_set, batch_size=128, shuffle=True, drop_last=False, pin_memory=True, num_workers=4)
val_loader = DataLoader(val_set, batch_size=128, shuffle=False, drop_last=False, num_workers=4)

FileNotFoundError: [Errno 2] No such file or directory: 'training_labels.npy'

# ResNet Implementation

In [3]:
#Activation Functions
act_fn_by_name = {
    "tanh": nn.Tanh,
    "relu": nn.ReLU,
    "leakyrelu": nn.LeakyReLU,
    "gelu": nn.GELU
}

In [5]:
#Individual Blocks within the ResNet Architecture
class ResNetBlock(nn.Module):
    def __init__(self, c_in, act_fn, c_out=-1, downsample=False):
        super().__init__()
        if not downsample:
            c_out = c_in
        
        #Neural Network representing F
        self.net = nn.Sequential(
            nn.Conv2d(c_in, c_out, kernel_size=3, padding=1, stride=1 if not downsample else 2, bias=False),
            nn.BatchNorm2d(c_out),
            act_fun(),
            nn.Conv2d(c_out, c_out, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(c_out)
        )
        
        #Downsampling network for the input in order to facilitate F(x) + x
        self.downsample = nn.Conv2d(c_in, c_out, kernel_size=1, stride=2) if subsample else None
        self.act_fn = act_fn()
        
    def forward(self, x):
        z = self.net(x)
        if self.downsample is not None:
            x = self.downsample(x)
        out = z + x
        out = self.act_fn(out)
        return out
        

In [6]:
#ResNet Model Implementation
class ResNet(nn.Module):
    def __init__(self, num_blocks=[3, 3, 3, 3], c_hidden=[64, 128, 256, 1024], act_fn_name='relu'):
        super().__init__()
        self.num_blocks = num_blocks
        self.c_hidden = c_hidden
        self.act_fn = act_fn_by_name[act_fn_name]
        self.act_fn_name = act_fn_name
        self._create_netwwork()
        self._init_params()
    
    def _create_network(self):
        
        self.input_net = nn.Sequential(
            nn.Conv2d(3, self.c_hidden[0], kernel_size=7, padding=3, stride=2, bias=False),
            nn.BatchNorm2d(self.c_hidden[0]),
            nn.MaxPool2d(kernel_size=3, padding=1, stride=2),
            self.act_fn()
        )
        
        blocks = []
        for block_idx, block_count in enumerate(self.num_blocks):
            for bc in range(block_count):
                downsample = (bc == 0 and block_idx > 0)
                blocks.append(
                    ResNetBlock(c_in=self.c_hidden[block_idx if not downsample else (block_idx - 1)],
                                act_fn=self.act_fn,
                                c_out=c_hidden[block_idx],
                                downsample=downsample))
        self.blocks = nn.Sequential(*blocks)
        
        self.output_net = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(c_hidden[-1], self.num_classes)
        )
    
    def _init_params(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity=self.act_fn_name)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        x = self.input_net(x)
        x = self.blocks(x)
        x = self.output_net(x)
        return x
            
                


In [None]:
# Finding a Device
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print("Device", device)

In [None]:
def train_model(model, nb_epochs, lr=0.01):
    model.to(device)
    loss_module = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    for epoch in range(nb_epochs):
        #Training the model on the given batch
        train_losses = []
        for batch in train_loader:
            x, y = batch
            x, y = x.to(device), y.to(device)
            #Calculating forward pass of the model
            output = model(x)
            
            #Calculating loss
            loss = loss_module(output, y)
            train_losses.append(loss)
            #Cleaning gradients
            optimizer.zero_grad()
            
            #Backwward propagation of partial gradients
            loss_module.backwward()
            optimizer.step()
        #Same steps, but calculating the validation loss
        val_losses = []
        for batch in val_loader:
            x, y = batch
            x, y = x.to(device), y.to(device)
            loss = loss_module(model(x), y)
            val_losses.append(loss)
        
        print("Epoch: " + str(epoch + 1) + " Train Loss: " + str(torch.tensor(train_losses).mean()) + " Validation Loss: " + str(torch.tensor(val_losses).mean()))                