# Covid-19 Chest X-Ray Prediction

In [1]:
import os
import cv2
import torch
import glob
import shutil
import itertools
import matplotlib.pyplot as plt
import torch.nn as nn
import pickle
import torch.optim as optim
import numpy as np 
import pandas as pd 
from sklearn.preprocessing import LabelEncoder
from multiprocessing.pool import ThreadPool

from tqdm import tqdm, tqdm_notebook
from PIL import Image
from pathlib import Path
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, models, transforms
import time
import copy
from random import shuffle
import pickle
import numpy as np
from sklearn.metrics import classification_report
from skimage import io

import matplotlib.patches as patches
from matplotlib.font_manager import FontProperties

## Data Visualisation

Dataset link - https://www.kaggle.com/datasets/tawsifurrahman/covid19-radiography-database

In [2]:
# пути к данным
covid_path = '../input/covid19-radiography-database/COVID-19_Radiography_Dataset/COVID/images'
normal_path = '../input/covid19-radiography-database/COVID-19_Radiography_Dataset/Normal/images'
test_path = '../input/covid19-radiography-database/COVID-19_Radiography_Dataset/Viral Pneumonia/images'

## Sorting out the files
Разделим данные на train и valid выборки в соотношении 80:20. К распределим соответсвующие изображения по папкам.

In [3]:
# создания соответсвующих разделов 
os.mkdir('/kaggle/working/train')
os.mkdir('/kaggle/working/valid')

os.mkdir('/kaggle/working/train/covid')
os.mkdir('/kaggle/working/valid/covid')

os.mkdir('/kaggle/working/train/normal')
os.mkdir('/kaggle/working/valid/normal')


In [4]:
os.mkdir('/kaggle/working/test')

In [5]:
# разделение на выборки
covid_train_len = int(np.floor(len(os.listdir(covid_path))*0.8))
covid_len = len(os.listdir(covid_path))

normal_train_len = int(np.floor(len(os.listdir(normal_path))*0.8))
normal_len = len(os.listdir(normal_path))
test_len = len(os.listdir(test_path))

In [6]:
# Переносим изображения по соответсвующим папкам
for trainimg in itertools.islice(glob.iglob(os.path.join(covid_path, '*.png')), covid_train_len):
    shutil.copy(trainimg, '/kaggle/working/train/covid')
    
for trainimg in itertools.islice(glob.iglob(os.path.join(normal_path, '*.png')), normal_train_len):
    shutil.copy(trainimg, '/kaggle/working/train/normal')

for testimg in itertools.islice(glob.iglob(os.path.join(covid_path, '*.png')), covid_train_len, covid_len):
    shutil.copy(testimg, '/kaggle/working/valid/covid')

for testimg in itertools.islice(glob.iglob(os.path.join(normal_path, '*.png')), normal_train_len, normal_len):
    shutil.copy(testimg, '/kaggle/working/valid/normal')
    
for trainimg in itertools.islice(glob.iglob(os.path.join(test_path, '*.png')), test_len):
    shutil.copy(trainimg, '/kaggle/working/test')

In [7]:
class NoneTransform(object):
    ''' Does nothing to the image. To be used instead of None '''
    
    def __call__(self, image):       
        return image

In [8]:
class XRAY_chest(Dataset):
    def __init__(self, files, mode):
        super().__init__()
        # список файлов для загрузки
        self.files = sorted(files)
        # режим работы
        self.mode = mode

        if self.mode not in DATA_MODES:
            print(f"{self.mode} is not correct; correct modes: {DATA_MODES}")
            raise NameError

        self.len_ = len(self.files)
     
        self.label_encoder = LabelEncoder()

        if self.mode != 'test':
            self.labels = [path.parent.name for path in self.files]
            self.label_encoder.fit(self.labels)

            with open('label_encoder.pkl', 'wb') as le_dump_file:
                  pickle.dump(self.label_encoder, le_dump_file)
                      
    def __len__(self):
        return self.len_
      
    def load_sample(self, file):
        image = Image.open(file)
        image.load()
        return image.convert('RGB')
  
    def __getitem__(self, index):
        # для преобразования изображений в тензоры PyTorch и нормализации входа
        x = self.load_sample(self.files[index])
        data_transforms = {
            'train': transforms.Compose([
                transforms.Resize(size=(224, 224)),
                
                transforms.RandomHorizontalFlip(),
                transforms.RandomRotation(degrees=30),
                transforms.ColorJitter(hue=.1, saturation=.1),
                transforms.ToTensor(),
                transforms.Lambda(lambda x: x.repeat(3, 1, 1)) if x.mode!='RGB'  else NoneTransform(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) 
#                 transforms.Normalize((0.5), (0.5))
            ]),
            'val_test': transforms.Compose([
                transforms.Resize(size=(224, 224)),
                transforms.CenterCrop(200),
                transforms.ToTensor(),
                transforms.Lambda(lambda x: x.repeat(3, 1, 1)) if x.mode!='RGB'  else NoneTransform(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) 
#                 transforms.Normalize((0.5), (0.5))
            ]),
        }

        transform = (data_transforms['train'] if self.mode == 'train' else 
                     data_transforms['val_test'])
        
        
        x = transform(x)
        
        if self.mode == 'test':
            return x
        else:
            label = self.labels[index]
            label_id = self.label_encoder.transform([label])
            y = label_id.item()
            return x, y

In [9]:
def imshow(inp, title=None, plt_ax=plt, default=False):
    """Imshow для тензоров"""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt_ax.imshow(inp)
    if title is not None:
        plt_ax.set_title(title)
    plt_ax.grid(False)

In [10]:
DATA_MODES = ['train', 'val', 'test']

In [11]:
#список файлов в каждой категории
TRAIN_DIR = Path('/kaggle/working/train')
VAL_DIR = Path('/kaggle/working/valid')
TEST_DIR = Path('/kaggle/working/test')

train_files = sorted(list(TRAIN_DIR.rglob('*.png')))
val_files = sorted(list(VAL_DIR.rglob('*.png')))
test_files = sorted(list(TEST_DIR.rglob('*.png')))

In [12]:
# выведем пример
val_dataset = XRAY_chest(val_files, mode='val')

fig, ax = plt.subplots(nrows=3, ncols=3,figsize=(8, 8), \
                        sharey=True, sharex=True)
for fig_x in ax.flatten():
    random_characters = int(np.random.uniform(0,1000))
    im_val, label = val_dataset[random_characters]
    img_label = " ".join(map(lambda x: x.capitalize(),\
                val_dataset.label_encoder.inverse_transform([label])[0].split('_')))
    imshow(im_val.data.cpu(), \
          title=img_label,plt_ax=fig_x)

# Преобразуем данные

In [13]:
if val_dataset is None:
    val_dataset = XRAY_chest(val_files, mode='val')
    
train_dataset = XRAY_chest(train_files, mode='train')

# Creating model

In [14]:
# проверяем возможность учится на gpu
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [15]:
def fit_epoch(model, train_loader, criterion, optimizer):
    running_loss = 0.0
    running_corrects = 0
    processed_data = 0
  
    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()

        optimizer.step()

        preds = torch.argmax(outputs, 1)
        
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        processed_data += inputs.size(0)
              
    train_loss = running_loss / processed_data
    train_acc = running_corrects.cpu().numpy() / processed_data
    return train_loss, train_acc

In [16]:
def eval_epoch(model, val_loader, criterion):
    model.eval()

    running_loss = 0.0
    running_corrects = 0
    processed_size = 0

    for inputs, labels in val_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        with torch.set_grad_enabled(False):
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            preds = torch.argmax(outputs, 1)

        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        processed_size += inputs.size(0)
        
    val_loss = running_loss / processed_size
    val_acc = running_corrects.double() / processed_size
    return val_loss, val_acc

In [22]:
def train(train_files, val_files, model, optimizer, 
          criterion, epochs, batch_size,scheduler):
  
    train_loader = DataLoader(train_dataset, batch_size=batch_size, 
                              shuffle=True, num_workers=4)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, 
                            shuffle=False, num_workers=4)

    history = []
    log_template = "\nEpoch {ep:03d} train_loss: {t_loss:0.4f} \
    val_loss {v_loss:0.4f} train_acc {t_acc:0.4f} val_acc {v_acc:0.4f}"

    with tqdm(desc="epoch", total=epochs) as pbar_outer:
        
        for epoch in range(epochs):
            train_loss, train_acc = fit_epoch(model, train_loader, 
                                              criterion, optimizer)
            print("loss", train_loss)
            
            val_loss, val_acc = eval_epoch(model, val_loader, criterion)
            print("val loss:", val_loss)

            history.append((train_loss, train_acc, val_loss, val_acc))

            scheduler.step()
            
            pbar_outer.update(1)
            tqdm.write(log_template.format(ep=epoch+1, t_loss=train_loss,\
                                           v_loss=val_loss, t_acc=train_acc, 
                                           v_acc=val_acc))
            
    return history

In [18]:
def predict(model, test_loader):
    with torch.no_grad():
        logits = []
    
        for inputs in test_loader:
            inputs = inputs.to(device)
            model.eval()
            outputs = model(inputs).cpu()
            logits.append(outputs)
            
    probs = nn.functional.softmax(torch.cat(logits), dim=-1).numpy()
    return probs

# Prepare model

In [19]:
resnet = models.resnet18(pretrained=True)

In [20]:
# замораживаем параметры (веса)
for param in resnet.parameters():
    param.requires_grad = True

# num_features -- это размерность вектора фич, поступающего на вход FC-слою
num_features = resnet.fc.in_features
n_classes = 2
# Заменяем Fully-Connected слой на наш линейный классификатор
resnet.fc = nn.Linear(num_features, n_classes)
resnet = resnet.to(device)

# Определяем оптимизатор, критерий
optimizer = optim.AdamW(resnet.parameters(),lr=1e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 3, 0.5)

criterion=nn.CrossEntropyLoss()

# Learn model

In [23]:
# запускаем обучение
resnet_fine = train(train_dataset, val_dataset, model=resnet, criterion=criterion,
                          epochs=10, batch_size=16, optimizer=optimizer, scheduler=scheduler)

In [24]:
loss, acc, val_loss, val_acc = zip(*resnet_fine)

In [25]:
plt.figure(figsize=(15, 9))
plt.plot(loss, label="train_loss")
plt.plot(val_loss, label="val_loss")
plt.legend(loc='best')
plt.xlabel("epochs")
plt.ylabel("loss")
plt.show()

In [26]:
#dir for save model
os.mkdir('/kaggle/working/models')

In [27]:
#save model
torch.save(resnet.state_dict(), 'models/weights.h5') #save the model's weights
#load model
# model.load_state_dict(torch.load('models/weights.h5')) #load the model's weights

# Vusialization results

In [28]:
def predict_one_sample(model, inputs, device=device):
    """Предсказание, для одной картинки"""
    with torch.no_grad():
        inputs = inputs.to(device)
        model.eval()
        logit = model(inputs).cpu()
        probs = torch.nn.functional.softmax(logit, dim=-1).numpy()
    return probs

In [33]:
label_encoder = pickle.load(open("label_encoder.pkl", 'rb'))

In [34]:
# небольшая визуализация

fig, ax = plt.subplots(nrows=3, ncols=3,figsize=(12, 12), \
                        sharey=True, sharex=True)
for fig_x in ax.flatten():
    random_characters = int(np.random.uniform(0,1000))
    im_val, label = val_dataset[random_characters]
    img_label = " ".join(map(lambda x: x.capitalize(),\
                val_dataset.label_encoder.inverse_transform([label])[0].split('_')))
    
    

    imshow(im_val.data.cpu(), \
          title=img_label,plt_ax=fig_x)
    
    actual_text = "Actual : {}".format(img_label)
            
    fig_x.add_patch(patches.Rectangle((0, 200),120,30,color='white'))
    font0 = FontProperties()
    font = font0.copy()
    font.set_family("fantasy")
    prob_pred = predict_one_sample(resnet, im_val.unsqueeze(0))
    predicted_proba = np.max(prob_pred)*100
    y_pred = np.argmax(prob_pred)
    
    predicted_label = label_encoder.classes_[y_pred]
    predicted_text = "{} : {:.0f}%".format(predicted_label,predicted_proba)
            
    fig_x.text(100, 200, predicted_text , horizontalalignment='center', fontproperties=font,
                    verticalalignment='top',fontsize=10, color='black',fontweight='bold')

# F1 по valid dataset

In [30]:
# прогоняем сеть по всему val_dataset 

imgs = [val_dataset[id][0].unsqueeze(0) for id in range(len(val_dataset))]

probs_ims = predict(resnet, imgs)

y_pred = np.argmax(probs_ims,-1)
actual_labels = [val_dataset[id][1] for id in range(len(val_dataset))]
preds_class = [label_encoder.classes_[i] for i in y_pred]
actual_class = [label_encoder.classes_[i] for i in actual_labels]

In [31]:
print(classification_report(actual_class, preds_class))

# Make prediction for unknown data covid or normal

 Я сделал на valid выборке, можно потестить на другом датасете

In [None]:
from sklearn.metrics import classification_report

In [35]:
def predict(model, test_loader):
    with torch.no_grad():
        logits = []
    
        for inputs in test_loader:
            inputs = inputs.to(device)
            model.eval()
            outputs = model(inputs).cpu()
            logits.append(outputs)
            
    probs = nn.functional.softmax(torch.cat(logits), dim=-1).numpy()
    return probs

In [36]:
test_dataset = XRAY_chest(test_files, mode="test")
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=64)
probs = predict(resnet, test_loader)
label_encoder = pickle.load(open("label_encoder.pkl", 'rb'))

preds = label_encoder.inverse_transform(np.argmax(probs, axis=1))

In [37]:
unique, counts = np.unique(preds, return_counts=True)
dict(zip(unique, counts))

# Альтернативная модель

In [None]:
model = models.densenet121(pretrained=True)
# замораживаем параметры (веса)
for param in model.parameters():
    param.requires_grad = False
# num_features -- это размерность вектора фич, поступающего на вход FC-слою

# num_features = model.fc.in_features
n_classes = 2
# Заменяем Fully-Connected слой на наш линейный классификатор
model.classifier = nn.Linear(1024, n_classes)
model = model.to(device)

# Определяем оптимизатор, критерий
criterion = nn.CrossEntropyLoss()
# optimizer = optim.AdamW(model.classifier.parameters())
optimizer = optim.SGD(model.classifier.parameters(),lr=0.1)

In [None]:
#запускаем
result_model_2 = trained_model(model=model, dataloaders=dataloaders, criterion=criterion, optimizer=optimizer, epochs=10)

In [None]:
plot_loss(history_2)

In [None]:
y_pred_list, y_true_list = predict(result_model_2, dataloaders['validation'])

In [None]:
print(classification_report(y_true_list, y_pred_list))