In [2]:
import os
import random
from pathlib import Path
from PIL import Image
import pandas as pd

# Data Cleaning

### A. Get Paths

In [3]:
# Find Data
data = Path("Data/")
image_path = data / "XRAY_DATA"

# Train/Test Directory
train_dir = image_path / "train"
test_dir = image_path / "test"

In [4]:
# Get all image paths
train_image_paths = (list(train_dir.glob("*/*.png")) + 
                     list(train_dir.glob("*/*.jpeg")) + 
                     list(train_dir.glob("*.jpg")))

test_image_paths = (list(test_dir.glob("*/*.png")) +
                    list(test_dir.glob("*/*.jpeg")) +
                    list(test_dir.glob("*/*.jpg")))

### B. Get Data Labels

In [5]:
# Load Image Meta Data For Labels
df = pd.read_csv(data / "Metadata.csv")

# Remove unneeded columns
df.drop('Unnamed: 0', axis=1, inplace=True)
df.drop('Label_2_Virus_category', axis=1, inplace=True)
df.drop('Label_1_Virus_category', axis=1, inplace=True)

### C. Sort Data into Classes based on labels

In [6]:
# Sort image into folders by label and delete files not in the dataframe
for image_path in train_image_paths:
    image_name = image_path.name
    if image_name not in df['X_ray_image_name'].values:
        os.remove(image_path)
    else:
        label = df[df['X_ray_image_name'] == image_name]['Label'].values[0]
        if label == 'Normal':
            os.rename(image_path, train_dir / "Normal" / image_name)
        elif label == 'Pnemonia':
            os.rename(image_path, train_dir / "Pnemonia" / image_name)

for image_path in test_image_paths:
    image_name = image_path.name
    if image_name not in df['X_ray_image_name'].values:
        os.remove(image_path)
    else:
        label = df[df['X_ray_image_name'] == image_name]['Label'].values[0]
        if label == 'Normal':
            os.rename(image_path, test_dir / "Normal" / image_name)
        elif label == 'Pnemonia':
            os.rename(image_path, test_dir / "Pnemonia" / image_name)

# Transform and Load Data with Torchvision

In [7]:
import torch
from torch import nn
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader

In [8]:
# Make all of the images the same size
img_transforms = transforms.Compose([
    transforms.Resize([512, 512]),
    transforms.ToTensor()
])

In [9]:
# Load Data
train_data = datasets.ImageFolder(root = train_dir,
                                  transform=img_transforms,
                                  target_transform=None)

test_data = datasets.ImageFolder(root= test_dir,
                                 transform=img_transforms)

In [10]:
# Turn into dataloaders
BATCH_SIZE = 32
NUM_WORKERS = 4

train_dataloader = DataLoader(dataset = train_data,
                              batch_size=BATCH_SIZE,
                              shuffle = True,
                              num_workers= NUM_WORKERS)

test_dataloader = DataLoader(dataset=test_data,
                             batch_size=BATCH_SIZE,
                             shuffle=False,
                             num_workers=NUM_WORKERS)

# Create CNN Model

In [15]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [11]:
class CNN_model(nn.Module):
    def __init__(self, input_shape, hidden_dim, output_shape):
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=input_shape,
                      out_channels=hidden_dim,
                      kernel_size= 7,
                      stride = 2,
                      padding = 0),
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_dim,
                      out_channels=hidden_dim,
                      kernel_size= 7,
                      stride = 2,
                      padding = 0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,
                         stride = 2)
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(in_channels=hidden_dim,
                      out_channels=hidden_dim,
                      kernel_size= 3,
                      stride = 1,
                      padding = 0),
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_dim,
                      out_channels=hidden_dim,
                      kernel_size= 3,
                      stride = 1,
                      padding = 0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,
                         stride = 2)
        )
        self.conv_block_3 = nn.Sequential(
            nn.Conv2d(in_channels=hidden_dim,
                      out_channels=hidden_dim,
                      kernel_size= 3,
                      stride = 1,
                      padding = 0),
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_dim,
                      out_channels=hidden_dim,
                      kernel_size= 3,
                      stride = 1,
                      padding = 0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,
                         stride = 2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=hidden_dim*12*12,
                      out_features=output_shape)
        )

    def forward(self, x):
        x = self.conv_block_1(x)
        # print(x.shape)
        x = self.conv_block_2(x)
        # print(x.shape)
        x = self.conv_block_3(x)
        # print(x.shape)
        x = self.classifier(x)
        return x

In [46]:
# Initialize model
model = CNN_model(input_shape=3,
                  hidden_dim=10,
                  output_shape=len(train_data.classes)).to(device)

In [47]:
# Create train_step()

def train_step(model: torch.nn.Module,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer
               ):
    
    # Put the model in train mode
    model.train()

    # Setup train loss and train accuracy value
    train_loss, train_acc = 0, 0

    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        # Move data to device
        X, y = X.to(device), y.to(device)

        # 1. Forward Pass
        y_pred = model(X)

        # 2. Calculate the loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item()

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss Backward
        loss.backward()

        # 5. optimizer step
        optimizer.step()

        # Calculate accuracy metric
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class==y).sum().item()/len(y_pred) 

    # Adjust metrics to get average loss and accuracy per batch
    train_loss /= len(dataloader)
    train_acc /= len(dataloader)
    return train_loss, train_acc

In [48]:
# Create test_step()

def test_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module
              ):
    
    # Put the model in eval mode
    model.eval()

    with torch.inference_mode():
        # Setup test loss and test accuracy value
        test_loss, test_acc = 0, 0

        # Loop through data loader data batches
        for batch, (X, y) in enumerate(dataloader):
            # Move data to device
            X, y = X.to(device), y.to(device)

            # 1. Forward Pass
            y_pred = model(X)

            # 2. Calculate the loss
            loss = loss_fn(y_pred, y)
            test_loss += loss.item()

            # Calculate accuracy metric
            y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
            test_acc += (y_pred_class==y).sum().item()/len(y_pred)

        # Adjust metrics to get average loss and accuracy per batch
        test_loss /= len(dataloader)
        test_acc /= len(dataloader)
    return test_loss, test_acc

In [49]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model.parameters(),
                             lr = 0.001)

In [None]:
EPOCHS = 10

# Train and Test Loop
for epoch in range(EPOCHS):
    train_loss, train_acc = train_step(model = model,
                                       dataloader = train_dataloader,
                                       loss_fn= loss_fn,
                                       optimizer= optimizer)
    
    test_loss, test_acc = test_step(model = model,
                                    dataloader = test_dataloader,
                                    loss_fn= loss_fn)
    
    print(f"Epoch: {epoch} | Train Loss: {train_loss:.2f} | Train Acc: {train_acc:.2f} | Test Loss: {test_loss:.2f} | Test Acc: {test_acc:.2f}")

Epoch: 0 | Train Loss: 0.5707263377416565 | Train Acc: 0.7435366465863453 | Test Loss: 0.6399790181054009 | Test Acc: 0.6822916666666666
Epoch: 1 | Train Loss: 0.5680739904742643 | Train Acc: 0.7465486947791165 | Test Loss: 0.6296020613776313 | Test Acc: 0.6822916666666666
Epoch: 2 | Train Loss: 0.5666707755930452 | Train Acc: 0.7473644578313253 | Test Loss: 0.629071788655387 | Test Acc: 0.6822916666666666
Epoch: 3 | Train Loss: 0.5675142143505165 | Train Acc: 0.7465486947791165 | Test Loss: 0.6594715093572935 | Test Acc: 0.6822916666666666
Epoch: 4 | Train Loss: 0.5681091907871775 | Train Acc: 0.7465486947791165 | Test Loss: 0.6320347471369637 | Test Acc: 0.6822916666666666
Epoch: 5 | Train Loss: 0.5634685397507196 | Train Acc: 0.7465486947791165 | Test Loss: 0.8868400640785694 | Test Acc: 0.6822916666666666
Epoch: 6 | Train Loss: 0.24273062284183072 | Train Acc: 0.9056852409638554 | Test Loss: 0.41233713469571537 | Test Acc: 0.819264846743295
Epoch: 7 | Train Loss: 0.1318722134680453

### Extract Features and Use SVM Classifier

In [51]:
def extract_features(model, dataloader):
    model.eval()
    features, labels = [], []
    with torch.inference_mode():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)
            y_pred = model(X)
            features.append(y_pred.cpu())
            labels.append(y.cpu())
    return torch.cat(features), torch.cat(labels)

In [52]:
train_features, train_labels = extract_features(model, train_dataloader) 
test_features, test_labels = extract_features(model, test_dataloader)

In [53]:
from sklearn.svm import SVC
import numpy as np

# Train SVM
def train_svm(features, labels):
    clf = SVC()
    clf.fit(features, labels)
    return clf

In [None]:
# Test SVM
clf = train_svm(train_features, train_labels)
y_pred = clf.predict(test_features)

error = np.mean(y_pred != test_labels.numpy())
print(f"Error: {error:.2f}")

Error: 0.21
