In [2]:
from __future__ import print_function

import glob
from itertools import chain

import os
import random
import zipfile

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from PIL import Image
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
from tqdm.notebook import tqdm

from google.colab import drive
import kagglehub
import shutil
import os

import random

In [3]:
drive.mount('/content/drive')

MessageError: Error: credential propagation was unsuccessful

In [None]:
!pip install -q kaggle

!mkdir -p ~/.kaggle
!cp /content/drive/MyDrive/kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
print(f"Torch: {torch.__version__}")

In [None]:
import os


In [None]:


# Step 2: Define the folder in Google Drive where the datasets will be saved
drive_folder = '/content/drive/MyDrive/KaggleDatasets/'

# Create the destination folder in Google Drive if it doesn't exist
os.makedirs(drive_folder, exist_ok=True)

# Step 3: Download the first dataset (resized 2015-2019 blindness detection images)
dataset_path_1 = kagglehub.dataset_download("benjaminwarner/resized-2015-2019-blindness-detection-images")
print("First dataset downloaded to:", dataset_path_1)

# Step 4: Copy the first dataset to Google Drive
drive_path_1 = os.path.join(drive_folder, "resized-2015-2019-blindness-detection-images/")
shutil.copytree(dataset_path_1, drive_path_1, dirs_exist_ok=True)
print(f"First dataset copied to: {drive_path_1}")

# Step 5: Download the second competition dataset (aptos2019-blindness-detection)
dataset_path_2 = kagglehub.competition_download("aptos2019-blindness-detection")
print("Second competition dataset downloaded to:", dataset_path_2)

# Step 6: Copy the second dataset to Google Drive
drive_path_2 = os.path.join(drive_folder, "aptos2019-blindness-detection/")
shutil.copytree(dataset_path_2, drive_path_2, dirs_exist_ok=True)
print(f"Second dataset copied to: {drive_path_2}")



dataset_path_3 = kagglehub.dataset_download("pineapplepencil/custom-transform-blindness-2019")
print("Third competition dataset downloaded to:", dataset_path_3)

# Step 6: Copy the second dataset to Google Drive
drive_path_3 = os.path.join(drive_folder, "custom-transform-blindness-2019/")
shutil.copytree(dataset_path_2, drive_path_3, dirs_exist_ok=True)
print(f"Third dataset copied to: {drive_path_3}")

### PARAMETERS SELLECTION

In [None]:
# Training settings
batch_size = 64
epochs = 20
lr = 5e-4
gamma = 0.8
seed = 42
num_classes = 1
device = 'cuda'

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

seed_everything(seed)

### PREPROCESSING

In [None]:
#The Code from: https://www.kaggle.com/ratthachat/aptos-updated-albumentation-meets-grad-cam
import cv2

def crop_image1(img,tol=7):
    # img is image data
    # tol  is tolerance

    mask = img>tol
    return img[np.ix_(mask.any(1),mask.any(0))]

def crop_image_from_gray(img,tol=7):
    if img.ndim ==2:
        mask = img>tol
        return img[np.ix_(mask.any(1),mask.any(0))]
    elif img.ndim==3:
        gray_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        mask = gray_img>tol

        check_shape = img[:,:,0][np.ix_(mask.any(1),mask.any(0))].shape[0]
        if (check_shape == 0): # image is too dark so that we crop out everything,
            return img # return original image
        else:
            img1=img[:,:,0][np.ix_(mask.any(1),mask.any(0))]
            img2=img[:,:,1][np.ix_(mask.any(1),mask.any(0))]
            img3=img[:,:,2][np.ix_(mask.any(1),mask.any(0))]
    #         print(img1.shape,img2.shape,img3.shape)
            img = np.stack([img1,img2,img3],axis=-1)
    #         print(img.shape)
        return img


### TEST IMAGE TRANSFORMATION

In [None]:
import os
import cv2
from tqdm import tqdm

# Path to the folder containing input images
inPath = '/content/drive/MyDrive/KaggleDatasets/aptos2019-blindness-detection/test_images'

# Path of the folder that will contain the transformed images
outPath = "test_images_transformed"
os.makedirs(outPath, exist_ok=True)

# Loop through the images and apply transformations
for imagePath in tqdm(os.listdir(inPath), desc="Processing images"):
    # imagePath contains name of the image
    inputPath = os.path.join(inPath, imagePath)

    image = cv2.imread(inputPath)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = crop_image_from_gray(image)
    image = cv2.resize(image, (224, 224))
    image = cv2.addWeighted(image, 4, cv2.GaussianBlur(image, (0, 0), 30), -4, 128)

    fullOutPath = os.path.join(outPath, imagePath)
    cv2.imwrite(fullOutPath, image)


In [None]:
inPath = '/content/drive/MyDrive/KaggleDatasets/aptos2019-blindness-detection/test_images'

# path of the folder that will contain the modified image
try:
    os.mkdir("test_images_transformed")
except:
    print("path already exists")

outPath ="test_images_transformed"

for imagePath in tqdm(os.listdir(inPath)):
    # imagePath contains name of the image
    inputPath = os.path.join(inPath, imagePath)

    image = cv2.imread(inputPath)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = crop_image_from_gray(image)
    image = cv2.resize(image, (224, 224))
    image = cv2.addWeighted (image,4, cv2.GaussianBlur( image , (0,0) , 30) ,-4 ,128)

    fullOutPath = os.path.join(outPath, imagePath)
    cv2.imwrite(fullOutPath, image)


In [None]:
train_dir = '/content/drive/MyDrive/KaggleDatasets/custom-transform-blindness-2019/train_images_transformed'
test_dir = './test_images_transformed'


In [None]:
train_list = glob.glob(os.path.join(train_dir,'*.*'))
test_list = glob.glob(os.path.join(test_dir, '*.png'))

In [None]:
print(f"Train Data: {len(train_list)}")
print(f"Test Data: {len(test_list)}")

### LABELING

In [None]:
df_train = pd.read_csv('/content/drive/MyDrive/KaggleDatasets/aptos2019-blindness-detection/train.csv')
df_train_old = pd.read_csv("/content/drive/MyDrive/KaggleDatasets/resized-2015-2019-blindness-detection-images/labels/trainLabels15.csv")
df_train_old = df_train_old.rename({"image" : "id_code", "level" : "diagnosis"}, axis=1)
df_train = df_train.append(df_train_old).reset_index(drop=True)

labels = df_train['diagnosis'].values
label_lookup = df_train.set_index('id_code')

df_test = pd.read_csv('/content/drive/MyDrive/KaggleDatasets/aptos2019-blindness-detection/test.csv')

In [None]:
class_weights = df_train['diagnosis'].value_counts()
dfs = [df_train[df_train['diagnosis'] == i].sample(class_weights[4]) for i in range(5)]
resampled = pd.concat(dfs, axis = 0)

In [None]:
resampled.diagnosis.value_counts()

In [None]:
new_train_list = (train_dir + '/' + resampled['id_code'].apply(lambda x: x + ('.jpg' if '_' in x else '.png'))).values
new_train_list

#### ENCODING

In [None]:
y_train = pd.get_dummies(df_train['diagnosis']).values

print(y_train.shape)

In [None]:
y_train_multi = np.empty(y_train.shape, dtype=y_train.dtype)
y_train_multi[:, 4] = y_train[:, 4]

for i in range(3, -1, -1):
    y_train_multi[:, i] = np.logical_or(y_train[:, i], y_train_multi[:, i+1])

print("Original y_train:", y_train.sum(axis=0))
print("Multilabel version:", y_train_multi.sum(axis=0))

In [None]:
y_train_multi

### images VISUALIZATION

In [None]:
get_index = lambda x : df_train[df_train.id_code == x].index[0]
y_train_multi[get_index('0a4e1a29ffff')]

#### examples!

In [None]:
random_idx = np.random.randint(1, len(train_list), size=9)
fig, axes = plt.subplots(3, 3, figsize=(16, 12))

for idx, ax in enumerate(axes.ravel()):
    img = Image.open(train_list[idx])
    name = train_list[idx].split("/")[-1].split(".")[0]
    ax.set_title('label = '+ str(labels[idx]) + ", file = " + name)
    ax.imshow(img)

### DATASET SPLITTING

In [None]:
train_list, valid_list = train_test_split(new_train_list,
                                          test_size=0.05,
                                          random_state=seed)

In [None]:
print(len(train_list))
print(len(valid_list))

#### AUGMENTATION

In [None]:
train_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),
#         transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
    ]
)

val_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ]
)


test_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ]
)

#### Making the Dataset Class

In [None]:
class Blindness2019(Dataset):
    def __init__(self, file_list, transform=None):
        self.file_list = file_list
        self.transform = transform

    def __len__(self):
        self.filelength = len(self.file_list)
        return self.filelength

    def __getitem__(self, idx):
        img_path = self.file_list[idx]
        img = Image.open(img_path)
        img_transformed = self.transform(img)

        label = label_lookup.loc[img_path.split("/")[-1].split(".")[0]][0]
#         label = torch.tensor(label).to(torch.float32)
        image_id = img_path.split("/")[-1].split(".")[0]
#         label = y_train_multi[get_index(image_id)]
#         label = y_train_multi[random.randint(0,3000)]
        return img_transformed, label

class Blindness2019Test(Dataset):
    def __init__(self, file_list, transform=None):
        self.file_list = file_list
        self.transform = transform

    def __len__(self):
        self.filelength = len(self.file_list)
        return self.filelength

    def __getitem__(self, idx):
        img_path = self.file_list[idx]
        img = Image.open(img_path)
        img_transformed = self.transform(img)

        return img_transformed

#### Instantiating the dataset class

In [None]:
train_data = Blindness2019(train_list, transform=train_transforms)
valid_data = Blindness2019(valid_list, transform=test_transforms)
test_data = Blindness2019Test(test_list, transform=test_transforms)

In [None]:
train_loader = DataLoader(dataset = train_data, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(dataset = valid_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset = test_data, batch_size=1, shuffle=False)

In [None]:
print(len(train_data), len(train_loader))
print(len(valid_data), len(valid_loader))

In [None]:
pip install transformers --upgrade

## Setting Up the Vision Transformer (ViT) Model

We'll be using the ViT model pre-trained on a large dataset and fine-tuning it on our chest X-ray dataset.


In [None]:
from transformers import ViTForImageClassification, ViTConfig, ViTImageProcessor

# Define the ViT configuration
config = ViTConfig.from_pretrained("google/vit-base-patch16-224-in21k")
config.num_labels = 2  # Normal and Pneumonia

# Load the pre-trained model
model = ViTForImageClassification.from_pretrained("google/vit-base-patch16-224-in21k", config=config)


## Define Loss and Optimizer

We'll use the CrossEntropy loss as it's suitable for binary classification tasks. For optimization, we'll use the Adam optimizer.


In [None]:
import torch.optim as optim
import torch.nn as nn

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-5)


## Model Training

Let's train the ViT model on our chest X-ray dataset. We'll also validate the model on the validation set after each epoch.


In [None]:
import torch
import torch.nn as nn

# Check for GPU availability and set the device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define the training function
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10):
    train_losses = []  # List to store training loss for each epoch
    val_accuracies = []  # List to store validation accuracy for each epoch

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()

            outputs = model(images).logits  # Get logits from model outputs
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * images.size(0)

        # Validation phase
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images).logits
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        avg_train_loss = running_loss / len(train_loader.dataset)
        val_accuracy = 100 * correct / total

        # Append the computed values to their respective lists
        train_losses.append(avg_train_loss)
        val_accuracies.append(val_accuracy)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_train_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")

    return model, train_losses, val_accuracies


In [None]:
trained_model, train_losses, val_accuracies = train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10)


## Model Evaluation

Let's evaluate the trained ViT model on the test dataset.


In [None]:
# Evaluation function
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images).logits
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_accuracy = 100 * correct / total
    print(f"Test Accuracy: {test_accuracy:.2f}%")

evaluate_model(trained_model, test_loader)


## Save and Load the Model

After training, it's essential to save the model weights to avoid retraining in the future.


In [None]:
# Save the model weights
torch.save(trained_model.state_dict(), "vit_chest_xray_model.pth")

# to load the model in the future
# model.load_state_dict(torch.load("vit_chest_xray_model.pth"))


## Visualization of Sample Data

Displaying a few images from both NORMAL and PNEUMONIA classes to get a feel for the data.


In [None]:
display_images(train_dir)


In [None]:
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10):
    train_losses = []
    val_accuracies = []

    for epoch in range(num_epochs):
        model.train()

        # Training Phase
        running_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(images).logits
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * images.size(0)

        avg_train_loss = running_loss / len(train_loader.dataset)
        train_losses.append(avg_train_loss)

        # Validation Phase
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images).logits
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_accuracy = 100 * correct / total
        val_accuracies.append(val_accuracy)

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_train_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")

    return model, train_losses, val_accuracies


## Training and Validation Metrics Visualization

Plotting the training loss and validation accuracy to understand the model's learning progress.


In [None]:
plt.figure(figsize=(12, 5))

# Plotting training loss
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Training Loss')
plt.title('Training Loss over Epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

# Plotting validation accuracy
plt.subplot(1, 2, 2)
plt.plot(val_accuracies, label='Validation Accuracy', color='orange')
plt.title('Validation Accuracy over Epochs')
plt.xlabel('Epochs')
plt.ylabel('Accuracy (%)')
plt.legend()

plt.tight_layout()
plt.show()


From the above graphs, we observe that the training loss decreased over time, which is a positive sign. The validation accuracy remains high, suggesting that the model generalizes well. The dip in accuracy around the second epoch followed by consistent high accuracy indicates that the model might have overcome some initial adaptation challenges but then consistently performed well.

## Confusion Matrix

Visualizing the model's predictions using a confusion matrix to understand its performance in more detail.


In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

def plot_confusion_matrix(y_true, y_pred, classes):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8,6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=classes,
                yticklabels=classes)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix')
    plt.show()

# Getting the true labels and the predicted labels
y_true = []
y_pred = []

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images).logits
        _, predicted = outputs.max(1)
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Now, we'll plot the confusion matrix
labels_list = ["NORMAL", "PNEUMONIA"]
plot_confusion_matrix(y_true, y_pred, labels_list)


In the above image, the confusion matrix shows that the model correctly classified 131 patients as having pneumonia (TP) and 389 patients as not having pneumonia (TN). The model incorrectly classified 1 patient as having pneumonia (FP) and 103 patients as not having pneumonia (FN).

The overall accuracy of the model is 93.75%, which is good.
The accuracy of the model is calculated by dividing the number of true positives and true negatives by the total number of patients. In this case, the accuracy is 93.75%, which means that the model correctly classified 93.75% of the patients.

The sensitivity of the model is calculated by dividing the number of true positives by the total number of patients who actually had pneumonia. In this case, the sensitivity is 91.30%, which means that the model correctly identified 91.30% of the patients who actually had pneumonia.

## Conclusion

### Overview:
In the realm of medical imaging, the adaptation of the Vision Transformer (ViT) for chest X-ray classification showcases the vast potential of transfer learning. Drawing from its roots in natural language processing, ViT, through self-supervised learning, has successfully ventured into the domain of computer vision, offering a promising solution to detect pneumonia from X-rays.

### Key Achievements:
1. **ViT's Versatility**: The Vision Transformer's unique approach of segmenting images into patches for processing underscores its versatility. Its core design, which was originally intended for NLP tasks, has been seamlessly repurposed for intricate computer vision challenges.
  
2. **Impressive Training Dynamics**: Throughout the training phase, a consistent decline in the training loss was observed. This highlights the model's effective learning from the data, optimizing its parameters to reduce inaccuracies.
  
3. **Stellar Performance Metrics**: The model didn't just stop at learning; it showcased an exemplary generalization capability. The achieved test accuracy of 83.33% stands as a testament to the model's prowess.
  
4. **Confusion Matrix Insights**: A deep dive into the confusion matrix revealed the model's acute ability to differentiate between 'NORMAL' and 'PNEUMONIA' chest X-rays. Misclassifications were minimal, further solidifying trust in the model's predictions.

5. **Self-Supervised Excellence**: One of the crown jewels of the Vision Transformer is its proficiency in self-supervised learning. This feature enables the model to harness vast datasets without explicit labels, inherently generating supervisory signals from the data. This form of learning lays the foundation for its exceptional feature extraction capabilities.

### Looking Ahead:
The success story of the Vision Transformer in the chest X-ray classification task is a beacon of optimism. Its high accuracy, adaptability, and self-supervised learning capabilities paint a positive picture for its broader applications in the medical imaging domain. As we move forward, it's exciting to think about the myriad of challenges ViT could address, revolutionizing healthcare diagnostics.