# **Setup**

###First time executing

Set the ***first_time_executing*** variable to ***True*** if it's the first time executing the notebook

After executed, it will ask you to upload a file needed to download the dataset directly into Google Colab. To obtain that file:

*   Go to [Kaggle.com](https://www.kaggle.com/)
*   Go to your account
*   Under the voice API select "*Create new Token*"

This will download a *kaggle.json* file. Please upload that file in order to continue.

<br/>

**More info:**
The file contains your personal Kaggle API key: it will allow to download the dataset directly into Google Colab without downloading it in Google Drive beforehand (and without mounting the Drive).

The dataset will be unzipped directly in the current working directory (/content).

In [None]:
first_time_executing = True

if first_time_executing:

  from google.colab import files
  import os

  !pip install -q kaggle

  # Please upload the kaggle.json file you downlad from going to kaggle.com > Your account > API > Create new Token
  files.upload()

  !mkdir ~/.kaggle
  !cp kaggle.json ~/.kaggle/
  !chmod 600 ~/.kaggle/kaggle.json

  dataset_full_name = "tawsifurrahman/covid19-radiography-database" # The dataset name must be: Username/Dataset_name
  !kaggle datasets download {dataset_full_name}

  dataset_name = os.path.basename(os.path.normpath(dataset_full_name))
  !unzip {dataset_name}.zip

### Imports and Parameters

In [None]:
import torch
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import shutil
import random
import torch.nn as nn
import torch.nn.functional as F
import csv
from sklearn.model_selection import train_test_split
from PIL import Image
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import confusion_matrix
import seaborn as sns
from sklearn.model_selection import GridSearchCV

dataset_path = 'COVID-19_Radiography_Dataset'
balanced_dataset_path = 'COVID-19_Radiography_Dataset_Balanced'
new_dataset_name = 'Covid_Dataset'
train_csv_path = 'train.csv'
test_csv_path = 'test.csv'
train_dir = os.path.join(new_dataset_name, 'train')
test_dir = os.path.join(new_dataset_name, 'test')

# classes = ['COVID', 'Normal', 'Viral Pneumonia']
classes = ['COVID', 'Normal']
NUM_CLASSES = 2

BATCH_SIZE = 32
NUM_EPOCHS = 10
LEARNING_RATE = 0.09
TEST_SIZE = 0.2
SEED = 42
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

normalize = False

# If you want to train a convolutional set is_conv to True, if you want to train a fully connected, set is_conv to False
is_conv = True

torch.manual_seed(SEED)
random.seed(SEED)

#**Preprocessing**

### Data Balancing

#####Delete Lung Opacity Class

In [None]:
shutil.copytree(dataset_path, balanced_dataset_path)

lung_opacity_dir = os.path.join(balanced_dataset_path, 'Lung_Opacity')

try:
  shutil.rmtree(lung_opacity_dir)
  print(f"Successfully deleted: {lung_opacity_dir}")
except OSError as e:
  print(f"Error: {lung_opacity_dir} - {e}")

#####Balance Normal and Covid class

In [None]:
normal_images_path = os.path.join(balanced_dataset_path, 'Normal', 'images')
num_covid_images = len(os.listdir(os.path.join(balanced_dataset_path, 'COVID', 'images')))
normal_image_files = os.listdir(normal_images_path)

print(f'Before: {num_covid_images} Covid images / {len(normal_image_files)} Normal images')

images_to_delete = random.sample(normal_image_files, len(normal_image_files) - num_covid_images)
for image_file in images_to_delete:
  image_path = os.path.join(normal_images_path, image_file)
  os.remove(image_path)

print(f'After: {num_covid_images} Covid images / {len(os.listdir(normal_images_path))} Normal images')

#####CSV creation, Create the new dataset folder, Convert all images to grayscale

In [None]:
# Search for every image in the dataset, register the name and class within a CSV (train or test), convert the non-gray image into gray images (needed to calculate mean and std for Normalization) and move all the images into a new dataset folder partitioned train/test

os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

class_encoding = {class_name: idx for idx, class_name in enumerate(classes)}

with open(train_csv_path, 'w', newline='') as train_csv_file, open(test_csv_path, 'w', newline='') as test_csv_file:
  train_csv_writer = csv.writer(train_csv_file)
  test_csv_writer = csv.writer(test_csv_file)

  # Header
  train_csv_writer.writerow(['Index_Name', 'Class'])
  test_csv_writer.writerow(['Index_Name', 'Class'])

  for class_name in classes:
    class_dir = os.path.join(balanced_dataset_path, class_name, 'images')
    images = os.listdir(class_dir)

    train_images, test_images = train_test_split(images, test_size=TEST_SIZE, random_state=SEED)

    for image in train_images:
      # Write into the CSV
      index_name = f"{image}"
      class_label = class_encoding[class_name]
      train_csv_writer.writerow([index_name, class_label])

      # Move files
      source_path = os.path.join(class_dir, image)
      pre_image = np.array(Image.open(source_path))

      # Check if they are not grey scale
      if len(pre_image.shape) > 2:
        grayscale_image = np.dot(pre_image[..., :3], [0.299, 0.587, 0.114])
        grayscale_image = Image.fromarray(grayscale_image.astype(np.uint8))
        os.remove(source_path)
        grayscale_image.save(source_path)
        print('Modified: ', source_path)

      if os.path.exists(source_path) == True:
        destination_path = os.path.join(train_dir, image)
        shutil.copy(source_path, destination_path)
      else:
        raise Exception('FILE NOT FOUND')

    for image in test_images:
      index_name = f"{image}"
      class_label = class_encoding[class_name]
      test_csv_writer.writerow([index_name, class_label])

      source_path = os.path.join(class_dir, image)
      pre_image = np.array(Image.open(source_path))

      if len(pre_image.shape) > 2:
        grayscale_image = np.dot(pre_image[..., :3], [0.299, 0.587, 0.114])
        grayscale_image = Image.fromarray(grayscale_image.astype(np.uint8))
        os.remove(source_path)
        grayscale_image.save(source_path)
        print('Modified: ', source_path)

      if os.path.exists(source_path) == True:
        destination_path = os.path.join(test_dir, image)
        shutil.copy(source_path, destination_path)
      else:
        raise Exception('FILE NOT FOUND')

Check if there is a non-grayscale image

In [None]:
from PIL import Image
import os

def is_grayscale(image_path):
    # Open the image
    img = Image.open(image_path)

    # Check if it's a grayscale image
    return img.mode == 'L'

for root, dirs, files in os.walk(new_dataset_name):
    for file in files:

        if file.endswith('.jpg') or file.endswith('.png'):
            image_path = os.path.join(root, file)
            if not is_grayscale(image_path):
              raise Exception('Error: Multiple channels detected')
        else:
          raise Exception('Error: File format not supported')
print("All the images are grayscale")


### Checkings

##### Plot class occurrencies

In [None]:
import pandas as pd

train_df = pd.read_csv(train_csv_path)
test_df = pd.read_csv(test_csv_path)

train_counts = train_df['Class'].value_counts().sort_index()
test_counts = test_df['Class'].value_counts().sort_index()

plt.figure(figsize=(10, 6))

plt.bar(train_counts.index, train_counts.values, color='blue', alpha=0.7, label='Train')
plt.bar(test_counts.index, test_counts.values, color='orange', alpha=0.7, label='Test')

plt.xlabel('Class')
plt.ylabel('Number of Images')
plt.title('Number of Images per Class in Train and Test Sets')
plt.legend()
plt.show()

##### Check image dimensions

In [None]:
# All images should be 299 x 299, This script doesn't check the channels

from PIL import Image

def get_image_dimensions(image_path):
    with Image.open(image_path) as img:
        return img.size

# Loop through each image in the training folder
train_prev_dim = (0,0)
test_prev_dim = (0,0)
for img in os.listdir(train_dir):
  if img.endswith(".jpg") or img.endswith(".png"):
    img_path = os.path.join(train_dir, img)
    dimensions = get_image_dimensions(img_path)

    if train_prev_dim != dimensions:
      print("Train image dimensions: ", dimensions)
      train_prev_dim = dimensions

for img in os.listdir(test_dir):
  if img.endswith(".jpg") or img.endswith(".png"):
    img_path = os.path.join(test_dir, img)
    dimensions = get_image_dimensions(img_path)

    if test_prev_dim != dimensions:
      print("Test image dimensions: ", dimensions)
      test_prev_dim = dimensions

# **Prepare Dataset**

### Dataset creation

In [None]:
# from torchvision.transforms import v2
import torchvision.transforms as v2
import cv2
from PIL import Image
from torch.utils.data import Dataset, DataLoader

resizedTransform = v2.Compose([
  # v2.RandomResizedCrop(size=(224, 224), antialias=True),
  # v2.RandomHorizontalFlip(p=0.5),
  # ToFloatTensor()
  v2.ToTensor(), # Converts a PIL Image or numpy.ndarray (H x W x C) in the range [0, 255] to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0]
  # v2.ToDtype(torch.float32, scale=True)
  # v2.Normalize(mean=[0.485], std=[0.229]),
  if is_conv = False:
    v2.Resize(size=(150,150))
])

class CovidDataset(Dataset):
  def __init__(self, train_images_folder, test_images_folder, path_to_train_csv, path_to_test_csv, train=True, transform=None):
    super().__init__()
    self.train_data = pd.read_csv(path_to_train_csv)
    self.test_data = pd.read_csv(path_to_test_csv)
    self.train_images_folder = train_images_folder
    self.test_images_folder = test_images_folder
    self.images_files = os.listdir(train_images_folder)
    self.transform = transform
    self.train = train

  def __len__(self):
    return self.train_data.shape[0] if self.train else self.test_data.shape[0]

  def __getitem__(self, index):
    if self.train:
      image_file, label = self.train_data.iloc[index]
      image = np.array(Image.open(os.path.join(self.train_images_folder, image_file)))
    else:
      image_file, label = self.test_data.iloc[index]
      image = np.array(Image.open(os.path.join(self.test_images_folder, image_file)))

    if self.transform:
      # image = self.transform(image)["image"]
      image = self.transform(image)



    return image, label, image_file

###Dataset Normalization

In [None]:
# if you think data can fit in your memory, change it_fits to True
if normalize:

# Compute mean and std of the data
  it_fits = False

  dataset = CovidDataset(
    train_images_folder = train_dir,
    test_images_folder = test_dir,
    path_to_train_csv = train_csv_path,
    path_to_test_csv = test_csv_path,
    transform = resizedTransform
  )

  if it_fits:
    loader = DataLoader(
      dataset=dataset, batch_size=len(pd.read_csv(train_csv_path)), num_workers=1, shuffle=True, pin_memory=True
    )
    data = next(iter(loader))
    mean = data[0].mean()
    std = data[0].std()
    print('Mean: ', mean, '\nStd: ', std)

  else:

    loader = DataLoader(
      dataset=dataset, batch_size=1000, num_workers=0, shuffle=True, pin_memory=True
    )
    num_of_pixels = len(pd.read_csv(train_csv_path)) *299*299
    total_sum = 0
    for batch in loader:
      total_sum += batch[0].sum()
      # since get item returned image, label, name_of_the_image, batch is composed of 3 elements: [0]: series of 1000 images, [1]: series of 1000 labels, [2]: series of 1000 image names
    mean = total_sum/num_of_pixels

    sum_of_squared_error = 0
    for batch in loader:
      sum_of_squared_error += ((batch[0]-mean).pow(2)).sum()
    std = torch.sqrt(sum_of_squared_error/num_of_pixels)

    print('Mean pre-normalization: ', mean)
    print('Standard Deviation pre-normalization: ', std)

  # Plot not normalized data

  num_bins = 50
  hist_sum = np.zeros(num_bins)
  # Define bin edges explicitly for the range [0, 1]
  bin_edges = np.linspace(-2, 2, num_bins + 1)
  for batch in loader:
      batch_data = batch[0].numpy()
      hist_batch, _ = np.histogram(batch_data, bins=bin_edges)
      hist_sum += hist_batch

  plt.bar(bin_edges[:-1], hist_sum, width=(bin_edges[1] - bin_edges[0]))
  plt.axvline(mean)
  plt.show()

  # Normalize the data

  resizedTransform = v2.Compose([
    v2.ToTensor(), # Converts a PIL Image or numpy.ndarray (H x W x C) in the range [0, 255] to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0]
    v2.Resize(size=(150,150)),
    v2.Normalize(mean=mean, std=std)
  ])

  normDataset = CovidDataset(
    train_images_folder = train_dir,
    test_images_folder = test_dir,
    path_to_train_csv = train_csv_path,
    path_to_test_csv = test_csv_path,
    transform = resizedTransform
  )

  normLoader = DataLoader(
    dataset=normDataset, batch_size=1000, num_workers=0, shuffle=True, pin_memory=True
  )

  num_of_pixels = len(pd.read_csv(train_csv_path)) *299*299
  total_sum = 0
  for batch in normLoader:
    total_sum += batch[0].sum()
    # since get item returned image, label, name_of_the_image, batch is composed of 3 elements: [0]: series of 1000 images, [1]: series of 1000 labels, [2]: series of 1000 image names
  mean = total_sum/num_of_pixels

  sum_of_squared_error = 0
  for batch in normLoader:
    sum_of_squared_error += ((batch[0]-mean).pow(2)).sum()
  std = torch.sqrt(sum_of_squared_error/num_of_pixels)

  print('Mean post-normalization: ', mean)
  print('Standard Deviation post-normalization: ', std)

  # Plot normalize data

  num_bins = 50
  hist_sum = np.zeros(num_bins)
  # Define bin edges explicitly for the range [0, 1]
  bin_edges = np.linspace(-2, 2, num_bins + 1)
  for batch in normLoader:
      batch_data = batch[0].numpy()
      hist_batch, _ = np.histogram(batch_data, bins=bin_edges)
      hist_sum += hist_batch

  plt.bar(bin_edges[:-1], hist_sum, width=(bin_edges[1] - bin_edges[0]))
  plt.axvline(mean)
  plt.show()

# **Create the Networks**

###Setup

#####Train and Test data loading

In [None]:
train_dataset = CovidDataset(
  train_images_folder = train_dir,
  test_images_folder = test_dir,
  path_to_train_csv = train_csv_path,
  path_to_test_csv = test_csv_path,
  transform = resizedTransform
)

test_dataset = CovidDataset(
  train_images_folder = train_dir,
  test_images_folder = test_dir,
  path_to_train_csv = train_csv_path,
  path_to_test_csv = test_csv_path,
  train = False,
  transform = resizedTransform
)

train_loader = DataLoader(
  dataset=train_dataset, batch_size=BATCH_SIZE, num_workers=0, shuffle=True, pin_memory=True
)

test_loader = DataLoader(
  dataset=test_dataset, batch_size=BATCH_SIZE, num_workers=0, shuffle=False, pin_memory=True
)

#####Networks creation

In [None]:
class ConvNet(nn.Module):
  def __init__(self):
    super(ConvNet, self).__init__()
    self.conv1 = nn.Conv2d(1, 8, 3)
    self.pool = nn.MaxPool2d(2, 2) # kernel size 2 and stride 2
    self.conv2 = nn.Conv2d(8, 16, 3)

    self.fc1 = nn.Linear(16*36*36, 5000)
    self.fc2 = nn.Linear(5000, NUM_CLASSES)


  def forward(self, x):
    x = self.pool(F.relu(self.conv1(x)))
    x = self.pool(F.relu(self.conv2(x)))

    x = x.view(-1, 16*36*36)
    x = F.relu(self.fc1(x))
    x = self.fc2(x)
    return x

 class FCNet(nn.Module):
   def __init__(self):
     super(FCNet, self).__init__()
     self.fc1 = nn.Linear(150*150, 5000)
     self.fc2 = nn.Linear(5000, 100)
     self.fc3 = nn.Linear(100, NUM_CLASSES)
     self.dropout1 = nn.Dropout2d(0.25)
     self.dropout2 = nn.Dropout2d(0.5)


   def forward(self, x):
     x = x.view(-1, 150*150)
     x = F.relu(self.fc1(x))
     x = self.dropout1(x)
     x = F.relu(self.fc2(x))
     x = self.dropout2(x)
     x = self.fc3(x)
     return x

###Train and Test

####Cross validation

To enable cross validation, we need to merge the train and test folder so that we can partition train and test set iteratively. The we will create another dataset from this folder and partition train and test set using the subset pytorch function

In [None]:
import os
import shutil

train_dir = 'Covid_Dataset/train'
test_dir = 'Covid_Dataset/test'
full_dir = 'Covid_Dataset/full'

def copy_files(src_dir, dst_dir):
    # Create the destination directory if it doesn't exist
    if not os.path.exists(dst_dir):
        os.makedirs(dst_dir)

    # Copy files from the source
    for filename in os.listdir(src_dir):
        src_path = os.path.join(src_dir, filename)
        dst_path = os.path.join(dst_dir, filename)

        # Check if the file doesn't already exist
        if not os.path.exists(dst_path):
            shutil.copy(src_path, dst_path)
        else:
            print(f"File {filename} already exists in the destination directory.")

# Copy files from train and test directories to the full directory
copy_files(train_dir, full_dir)
copy_files(test_dir, full_dir)

print("Files have been copied to the 'full' directory.")



In [None]:
import os
from torchvision import transforms
from torch.utils.data import Dataset
from PIL import Image

class CombinedDataset(Dataset):
    def __init__(self, folder, transform=None):
        self.folder = folder
        self.transform = transform
        self.image_paths = [os.path.join(dp, f) for dp, dn, filenames in os.walk(folder) for f in filenames if os.path.splitext(f)[1].lower() in ['.png', '.jpg', '.jpeg', '.bmp', '.gif']]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_name = self.image_paths[idx]
        image = Image.open(img_name)

        class_encoding = {class_name: idx for idx, class_name in enumerate(classes)}

        if self.transform:
            image = self.transform(image)

        label_str = img_name.split(os.sep)[-1].split('-')[0]
        # Convert class into label
        label = class_encoding[label_str]

        return image, label

In [None]:
from sklearn.model_selection import KFold
from torch.utils.data import SubsetRandomSampler, DataLoader
import numpy as np
import torch

full_dataset = CombinedDataset(folder='Covid_Dataset/full', transform=resizedTransform)

k_folds = 5
kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)

# List of accuracies of each fold. Needed to calculate its average at the end
fold_accuracies = []

# K-Fold Cross Validation model evaluation
for fold, (train_ids, val_ids) in enumerate(kfold.split(full_dataset)):
    print(f'FOLD {fold}')
    print('--------------------------------')

    # Create subsets
    train_subset = torch.utils.data.Subset(full_dataset, train_ids)
    val_subset = torch.utils.data.Subset(full_dataset, val_ids)

    # Define data loaders
    train_loader = DataLoader(
        train_subset,
        batch_size=BATCH_SIZE,
        shuffle = True
    )
    val_loader = DataLoader(
        val_subset,
        batch_size=BATCH_SIZE,
        shuffle = False
    )

    # Set model to FCNet() or ConvNet()
    model = ConvNet() if is_conv else FCNet()

    model.to(DEVICE)

    loss_fun = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)

    num_batches_in_train_dataset = len(train_loader)

    training_accuracy_list = []
    testing_accuracy_list = []

    # For the confusion matrix i want the last epoch of testing performance
    last_epoch_predictions = []
    last_epoch_labels = []

    for epoch in range(NUM_EPOCHS):

      model.train()
      for i, data in tqdm(enumerate(train_loader)):
        images, labels = data

        images = images.to(DEVICE)
        labels = labels.to(DEVICE)

        outputs = model(images)
        loss = loss_fun(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
      print('TRAINING END')

      # Calculate accuracy of training and testing at the end of the training epoch

      num_correct_train = 0
      num_samples_train = 0

      model.eval()
      with torch.no_grad():
        for images, labels in train_loader:

          images = images.to(DEVICE)
          labels = labels.to(DEVICE)
          outputs = model(images)

          _, predicted = torch.max(outputs, 1)

          num_samples_train += labels.size(0)
          num_correct_train += (predicted == labels).sum().item()

      training_accuracy = num_correct_train / num_samples_train
      training_accuracy_list.append(training_accuracy)

      num_correct_test = 0
      num_samples_test = 0
      all_predictions = []
      all_labels = []

      with torch.no_grad():
        for images, labels in val_loader:

          images = images.to(DEVICE)
          labels = labels.to(DEVICE)
          outputs = model(images)

          _, predicted = torch.max(outputs, 1)

          all_predictions.append(predicted.cpu().data.numpy())
          all_labels.append(labels.cpu().data.numpy())

          num_samples_test += labels.size(0)
          num_correct_test += (predicted == labels).sum().item()

          # if it is the last epoch
          if epoch == NUM_EPOCHS-1:
            last_epoch_predictions.append(predicted.cpu().data.numpy())
            last_epoch_labels.append(labels.cpu().data.numpy())


      testing_accuracy = num_correct_test / num_samples_test
      testing_accuracy_list.append(testing_accuracy)

      # Calulculate Precision and Recall after flattening the predictions and labels
      flattened_array1 = np.concatenate(all_predictions).ravel()
      result_list1 = flattened_array1.tolist()
      prediction_list = np.array(result_list1)

      flattened_array2 = np.concatenate(all_labels).ravel()
      result_list2 = flattened_array2.tolist()
      labels_list = np.array(result_list2)

      precision, recall, _, __ = precision_recall_fscore_support(labels_list, prediction_list,average='binary')
      print('\nFC Precision: ', precision)
      print('FC Recall: ', recall)


    # Append this iteration's accuracy
    fold_accuracies.append(testing_accuracy)
    print(fold_accuracies)

    # Flat the labels and predictions of the last epoch
    flattened_last_epoch_predictions_array = np.concatenate(last_epoch_predictions).ravel()
    flattened_last_epoch_predictions_list = flattened_last_epoch_predictions_array.tolist()
    last_epoch_predictions_list = np.array(flattened_last_epoch_predictions_list)

    flattened_last_epoch_labels_array = np.concatenate(all_labels).ravel()
    flattened_last_epoch_labels_list = flattened_last_epoch_labels_array.tolist()
    last_epoch_labels_list = np.array(flattened_last_epoch_labels_list)

    # The confusion matrix has information of only the last epoch
    plt.figure(figsize=(8, 6))
    conf_matrix = confusion_matrix(last_epoch_labels_list, last_epoch_predictions_list)
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['Class 0', 'Class 1'], yticklabels=['Class 0', 'Class 1'])
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()

    # The accuracy charts take into account the accuracy on all the epochs
    epochs = range(1, NUM_EPOCHS + 1)

    plt.figure(figsize=(8, 6))
    plt.plot(epochs, training_accuracy_list, label='Training Accuracy')
    plt.plot(epochs, testing_accuracy_list, label='Testing Accuracy')
    plt.title('Training and Testing Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

print('FINAL ACCURACY: ', sum(fold_accuracies)/5)

