In [22]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torchvision.transforms import v2
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset
import os 
import pandas as pd 
import numpy as np
from PIL import Image
# from transformers import ViTFeatureExtractor, ViTForImageClassification
# import requests



Data Loading

In [23]:
TRAIN_DIR = './data/Covid19-dataset/train'
TEST_DIR = './data/Covid19-dataset/test'

def load_data(data_path):
    filepaths = []
    labels = []

    folds = os.listdir(data_path)
    for fold in folds:
        foldpath = os.path.join(data_path, fold)
        filelist = os.listdir(foldpath)
        for file in filelist:
            fpath = os.path.join(foldpath, file)
            filepaths.append(fpath)
            labels.append(fold)

    # Concatenate data paths with labels into one dataframe
    Fseries = pd.Series(filepaths, name= 'filepaths')
    Lseries = pd.Series(labels, name='labels')
    df = pd.concat([Fseries, Lseries], axis= 1)
    return df

MAPPING = {'Normal':0,
           'Viral Pneumonia': 1,
           'Covid':2}

train_data = load_data(TRAIN_DIR)
# val_data = train_data.iloc[-int(len(train_data) * 0.3):-1, :]
test_data = load_data(TEST_DIR)
train_data.iloc[:,1] = train_data.iloc[:,1].apply(lambda x: MAPPING[x])
test_data.iloc[:,1] = test_data.iloc[:,1].apply(lambda x: MAPPING[x])
# data = pd.concat([train_data, test_data])

# data['true_label'] = data['labels'].apply(lambda x: MAPPING[x])
# train_data = train_data[train_data.labels != 'Viral Pneumonia']


Data Preprocessing

In [32]:
class CustomImageDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.data_frame = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        img_name = self.data_frame.iloc[idx, 0]
        image = Image.open(img_name).convert('RGB')
        label = self.data_frame.iloc[idx, 1]
        label = torch.tensor(label, dtype=torch.float32)
        
        if self.transform:
            image = self.transform(image)
        
        return image, label
    

# Define the transformations
data_transforms = {
    'train': v2.Compose([
        v2.Resize((224, 224)),
        v2.ToTensor(),
        # v2.RandomHorizontalFlip(p=0.5),
        # transforms.Normalize(mean=0, std=1)
        # transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': v2.Compose([
        v2.Resize((224, 224)),
        v2.ToTensor(),
        # transforms.Normalize(mean=0, std=1)
        # transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
}

train_dataset = CustomImageDataset(train_data, transform=data_transforms['train'])
val_dataset = CustomImageDataset(test_data, transform=data_transforms['val'])

train_loader = DataLoader(train_dataset, batch_size=5, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=5, shuffle=True)

dataloaders = {'train': train_loader, 'val': val_loader}
dataset_sizes = {'train': len(train_dataset), 'val': len(val_dataset)}

Model Definition

In [33]:
model = models.densenet121(pretrained=True)

device = torch.device('cuda' if torch.cuda.is_available() else "cpu")
# Modify the model for 3 classes
num_ftrs = model.classifier.in_features
model.classifier = nn.Linear(num_ftrs, 3)

criterion = nn.CrossEntropyLoss()
lr = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr = lr)
num_epochs = 5


Training 

In [34]:
# Training loop
for epoch in range(num_epochs):
    for phase in ['train', 'val']:
        if phase == 'train':
            model.train()
        else:
            model.eval()
        
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in dataloaders[phase]:
            inputs = inputs.to(device)
            # labels = labels.to(device).float().unsqueeze(1)
            labels = labels.to(device).long()
    
            optimizer.zero_grad()
            
            with torch.set_grad_enabled(phase == 'train'):
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)
                
                if phase == 'train':
                    loss.backward()
                    optimizer.step()
            
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
        
        epoch_loss = running_loss / dataset_sizes[phase]
        epoch_acc = running_corrects.double() / dataset_sizes[phase]
        
        print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
    
    print()

# # Save the model
torch.save(model.state_dict(), 'covid_classifier.pth')


train Loss: 0.6254 Acc: 0.7610
val Loss: 0.6815 Acc: 0.6667

train Loss: 0.5519 Acc: 0.8048
val Loss: 2.1882 Acc: 0.4242

train Loss: 0.3731 Acc: 0.8685
val Loss: 0.4766 Acc: 0.7727

train Loss: 0.4256 Acc: 0.8048
val Loss: 2.7340 Acc: 0.5758

train Loss: 0.2588 Acc: 0.9084
val Loss: 0.1648 Acc: 0.9545



Model loading and prediction


In [39]:
def load_model(model_path, device):
    model = models.densenet121(pretrained=True)
    num_ftrs = model.classifier.in_features
    model.classifier = nn.Linear(num_ftrs, 3)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model = model.to(device)
    model.eval()
    return model

def predict(model, dataloader, device):
    model.eval()
    all_labels = []
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
         
            all_labels.extend(preds.numpy())
    
    return all_labels

model_path = 'covid_classifier.pth'
model = load_model(model_path, device)


test_dataloader = DataLoader(val_dataset, batch_size=5, shuffle=False)

test_labels = predict(model, test_dataloader, device)



In [43]:
test_data['forecasted_label'] = test_labels

In [44]:
test_data[test_data.labels != test_data.forecasted_label]

Unnamed: 0,filepaths,labels,fc,forecasted_label
27,./data/Covid19-dataset/test/Normal/0116.jpeg,0,1,1
34,./data/Covid19-dataset/test/Normal/0118.jpeg,0,1,1
39,./data/Covid19-dataset/test/Normal/0112.jpeg,0,1,1


In [47]:
if not os.path.exists('output'):
    os.makedirs('output')


test_data.to_csv('output/predictions.csv')