### *Chest X-Ray Images (Pneumonia) Detector Using CNN and Pytorch*

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
from torchvision.models import resnet34
from torch.utils.data import random_split
from torch.utils.data import WeightedRandomSampler
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from tqdm import tqdm
import time
import random
import os
import matplotlib.pyplot as plt
import numpy as np
import pprint
import timm
import albumentations as A
from albumentations.pytorch import ToTensorV2

h = {
    "num_epochs": 10,
    "batch_size": 256,
    "image_size": 224,
    "fc1_size": 512,
    "lr": 0.001,
    "model": "efficientnetv2",
    "scheduler": "CosineAnnealingLR10",
    "balance": True,
    "early_stopping_patience": float("inf")
}

def extract_patient_ids(filename):
    patient_id = filename.split('_')[0].replace("person", "")
    return patient_id

def split_file_names(input_folder, val_split_perc):
    # Pneumonia files contain patient id, so we group split them by patient to avoid data leakage
    pneumonia_patient_ids = set([extract_patient_ids(fn) for fn in os.listdir(os.path.join(input_folder, 'PNEUMONIA'))])
    pneumonia_val_patient_ids = random.sample(list(pneumonia_patient_ids), int(val_split_perc * len(pneumonia_patient_ids)))

    pneumonia_val_filenames = []
    pneumonia_train_filenames = []

    for filename in os.listdir(os.path.join(input_folder, 'PNEUMONIA')):
        patient_id = extract_patient_ids(filename)
        if patient_id in pneumonia_val_patient_ids:
            pneumonia_val_filenames.append(os.path.join(input_folder, 'PNEUMONIA', filename))
        else:
            pneumonia_train_filenames.append(os.path.join(input_folder, 'PNEUMONIA', filename))

    # Normal (by file, no patient information in file names)
    normal_filenames  = [os.path.join(input_folder, 'NORMAL', fn) for fn in os.listdir(os.path.join(input_folder, 'NORMAL'))]
    normal_val_filenames = random.sample(normal_filenames, int(val_split_perc * len(normal_filenames)))
    normal_train_filenames = list(set(normal_filenames)-set(normal_val_filenames))

    train_filenames = pneumonia_train_filenames + normal_train_filenames
    val_filenames = pneumonia_val_filenames + normal_val_filenames

    return train_filenames, val_filenames

def create_weighted_sampler(h, dataset):
    targets = dataset.targets
    class_counts = np.bincount(targets)
    class_weights = 1.0 / class_counts
    weights = [class_weights[label] for label in targets]
    sampler = WeightedRandomSampler(weights, len(weights))
    return sampler

class CustomImageFolder(torch.utils.data.Dataset):
    def __init__(self, root, transform=None, is_valid_file=None):
        self.dataset = datasets.ImageFolder(
            root, 
            is_valid_file=is_valid_file
        )
        self.transform = transform
        self.targets = self.dataset.targets
        
    def __getitem__(self, index):
        image, label = self.dataset[index]
        if self.transform:
            image = self.transform(
                image=np.array(image))["image"]
        return image, label

    def __len__(self):
        return len(self.dataset)

def prepare_data(h):
    data_transforms_train_alb = A.Compose([
        A.Rotate(
            limit=20
        ),
        A.HorizontalFlip(p=0.5),
        A.ColorJitter(
            brightness=0.1, 
            contrast=0.1, 
            saturation=0.1, 
            hue=0.1, 
            p=1
        ),
        A.ShiftScaleRotate(
            shift_limit=0.1, 
            scale_limit=0, 
            rotate_limit=0, 
            p=0.5
        ),
        A.Perspective(
            scale=(0.05, 0.15), 
            keep_size=True, 
            p=0.5
        ),
        A.Resize(
            height=h["image_size"], 
            width=h["image_size"]
        ),
        A.Normalize(
            mean=[0.485, 0.456, 0.406], 
            std=[0.229, 0.224, 0.225]
        ),
        ToTensorV2()
    ])        

    data_transforms_val_alb = A.Compose([
        A.Normalize(
            mean=[0.485, 0.456, 0.406], 
            std=[0.229, 0.224, 0.225]
        ),
        A.Resize(
            h["image_size"], 
            h["image_size"]
        ),
        ToTensorV2(),
    ])    

    # Define the validation split percentage
    val_split = 0.2
    train_filenames, val_filenames = split_file_names(
        "C:\\Users\\Омар\\PycharmProjects\\FinalMLProject\\chest_xray\\chest_xray\\train\\", 
        val_split
    )
    # Load the datasets
    train_dataset = CustomImageFolder(
        "C:\\Users\\Омар\\PycharmProjects\\FinalMLProject\\chest_xray\\chest_xray\\train\\", 
        transform=data_transforms_train_alb, 
        is_valid_file=lambda x: x in train_filenames
    )
    val_dataset = CustomImageFolder(
        "C:\\Users\\Омар\\PycharmProjects\\FinalMLProject\\chest_xray\\chest_xray\\train\\", 
        transform=data_transforms_val_alb, 
        is_valid_file=lambda x: x in val_filenames
    )   
    
    test_dataset = CustomImageFolder(
        "C:\\Users\\Омар\\PycharmProjects\\FinalMLProject\\chest_xray\\chest_xray\\test\\", 
        transform=data_transforms_val_alb
    )
    # Create the data loaders for train, validation, and test sets
    if (h["balance"]):
        sampler = create_weighted_sampler(h, train_dataset)
        train_loader = torch.utils.data.DataLoader(
            train_dataset, 
            batch_size=h["batch_size"], 
            sampler=sampler, 
            num_workers=4
        )    
    else:
        train_loader = torch.utils.data.DataLoader(
            train_dataset, 
            batch_size=h["batch_size"], 
            shuffle=True, 
            num_workers=4
        )    
    val_loader = torch.utils.data.DataLoader(
        val_dataset, 
        batch_size=h["batch_size"], 
        shuffle=True, 
        num_workers=4
    )
    test_loader = torch.utils.data.DataLoader(
        test_dataset, 
        batch_size=h["batch_size"], 
        shuffle=True, 
        num_workers=4
    )

    return train_loader, val_loader, test_loader

def create_model(h, device):
    if (h["model"]=="efficientnetv2"):
        model = timm.create_model(
            "tf_efficientnetv2_b0", 
            pretrained=True, 
            num_classes=2
        )
        model = model.to(device)
        return model  
    if (h["model"]=="fc"):
        model = nn.Sequential(
            nn.Flatten(),
            nn.Linear(3 * h["image_size"] * h["image_size"], h["fc1_size"]),
            nn.ReLU(),
            nn.Linear(h["fc1_size"], 2)
        )
        model = model.to(device)
        return model
    if (h["model"]=="cnn"):
        model = nn.Sequential(
            nn.Conv2d(3, 16, 3, 
                      padding=1
                      ),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(16, 32, 3, 
                      padding=1
                      ),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, 3, 
                      padding=1
                      ),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Flatten(),
            nn.Dropout(0.25),
            nn.Linear(64 * (h["image_size"] // 8) * (h["image_size"] // 8), 512),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(512, 2)
        )
        model = model.to(device)
        return model
    if (h["model"]=="resnet34"):
        model = resnet34(pretrained=True)
        num_features = model.fc.in_features
        model.fc = nn.Linear(num_features, 2)
        model = model.to(device)
        return model

def train_model(h, model, train_loader, val_loader, optimizer, criterion, scheduler, device):
    train_loss_history = []
    val_loss_history = []

    best_val_loss = float('inf')
    patience_counter = 0
    patience = h["early_stopping_patience"]  # The patience threshold for early stopping

    start_time = time.time()
    num_epochs = h["num_epochs"]

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        progress_bar = tqdm(
            train_loader, 
            desc=f"Training epoch {epoch + 1}/{num_epochs}", 
            leave=False, 
            unit="mini-batch"
        )
        for inputs, labels in progress_bar:
            inputs, labels = inputs.to(device), labels.to(device)      
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            progress_bar.set_postfix(loss=loss.item())


        val_loss, _, _, _ = evaluate_model(h, model, val_loader, criterion, device)
        scheduler.step(val_loss)
        
        # Store the loss history
        train_loss = running_loss / len(train_loader)
        train_loss_history.append(train_loss)
        val_loss_history.append(val_loss)

        # Early stop check
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model_weights.pth')
            patience_counter = 0
        else:
            patience_counter += 1

        if patience_counter >= patience:
            print(f"Early stopping at epoch {epoch + 1}.")
            break        

        # Calculate elapsed time and remaining time
        elapsed_time = time.time() - start_time
        avg_time_per_epoch = elapsed_time / (epoch + 1)
        remaining_epochs = num_epochs - (epoch + 1)
        remaining_time = avg_time_per_epoch * remaining_epochs

        # Convert remaining time to minutes and seconds
        remaining_time_min, remaining_time_sec = divmod(remaining_time, 60)

        print(f"Epoch [{epoch + 1}/{num_epochs}]: Train Loss: {running_loss / len(train_loader):.4f}, Val Loss: {val_loss:.4f}, Remaining Time: {remaining_time_min:.0f}m {remaining_time_sec:.0f}s")

    # Loading best model 
    if patience!=float("inf"):
        model.load_state_dict(torch.load('best_model_weights.pth'))

    return train_loss_history, val_loss_history

def evaluate_model(h, model, data_loader, criterion, device):
    true_labels = []
    predicted_labels = []
    total_loss = 0.0
    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)          

            outputs = model(inputs)

            loss = criterion(outputs, labels)
            total_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

            true_labels.extend(labels.cpu().numpy())
            predicted_labels.extend(predicted.cpu().numpy())

    epoch_loss = total_loss / len(data_loader)
    epoch_accuracy = correct / total

    return epoch_loss, epoch_accuracy, true_labels, predicted_labels
def plot_metrics(h, train_loss_history, val_loss_history, test_loss, test_accuracy, true_labels, predicted_labels):
    print(f"Accuracy on the test set: {test_accuracy:.2%}")

    # Calculate precision, recall, and F1 score using the accumulated true labels and predictions
    precision = precision_score(true_labels, predicted_labels)
    recall = recall_score(true_labels, predicted_labels)
    f1 = f1_score(true_labels, predicted_labels)
    print(f"Precision: {precision:.2f}, Recall: {recall:.2f}, F1 score: {f1:.2f}")

    # Calculate the confusion matrix using the accumulated true labels and predictions
    cm = confusion_matrix(true_labels, predicted_labels)

    # Visualize the confusion matrix
    plt.figure()
    disp = ConfusionMatrixDisplay(
        confusion_matrix=cm, 
        display_labels=["Normal", "Pneumonia"]
    )
    disp.plot()

    # Plot the learning curves
    plt.figure()
    plt.plot(train_loss_history, label='Train Loss')
    plt.plot(val_loss_history, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Loss history')
    plt.legend()
    plt.show()  

def create_scheduler(h, optimizer, lr):
    scheduler_name = h["scheduler"]
    if (scheduler_name==""):
        return None
    if (scheduler_name=="CosineAnnealingLR10"):
        return torch.optim.lr_scheduler.CosineAnnealingLR(
            optimizer, 
            T_max=h["num_epochs"], 
            eta_min=lr*0.1
        )
    if (scheduler_name=="ReduceLROnPlateau5"):
        return torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, 
            mode='min', 
            factor=0.1, 
            patience=5, 
            verbose=True
        )
    print ("Error. Unknown scheduler name '{scheduler_name}'")
    return None

def check_solution(h, device, verbose):
    train_loader, val_loader, test_loader = prepare_data(h)
    model = create_model(h, device)
    optimizer = torch.optim.Adam(model.parameters(), lr=h["lr"])
    scheduler = create_scheduler(h, optimizer, h["lr"])
    criterion = nn.CrossEntropyLoss()
    train_loss_history, val_loss_history = train_model(h, model, train_loader, val_loader, optimizer, criterion, scheduler, device)
    test_loss, test_accuracy, true_labels, predicted_labels = evaluate_model(h, model, test_loader, criterion, device)
    if verbose:
        plot_metrics(h, train_loss_history, val_loss_history, test_loss, test_accuracy, true_labels, predicted_labels)

    f1 = f1_score(true_labels, predicted_labels)

    return f1, test_accuracy

# Check if GPU is available and set the device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

# Get number of CPU cores
num_cpu_cores = os.cpu_count()
print(f"Number of CPU cores: {num_cpu_cores}")

# Get GPU name
if torch.cuda.is_available():
    gpu_name = torch.cuda.get_device_name(0)
    print(f"GPU name: {gpu_name}")
else:
    print("No GPU available")

# Print hyperparameters for records
print("Hyperparameters:")
pprint.pprint(h)

f1_array = np.array([])
accuracy_array = np.array([])
start_time = time.time()

repeats = 10

for i in range(repeats):
    print(f"Running solution {i+1}/{repeats}")
    f1, accuracy = check_solution(h, device, verbose=(i==0))
    print(f"F1 = {f1:.2f}, accuracy = {accuracy:.2f} ")
    f1_array = np.append(f1_array, f1)
    accuracy_array = np.append(accuracy_array, accuracy) 

# Calculate elapsed time and remaining time
repeat_time = (time.time() - start_time) / repeats
repeat_time_min, repeat_time_sec = divmod(repeat_time, 60)

# Printing final results
print("Results")
print(f"F1 List: {f1_array}")
print(f"Accuracy List: {accuracy_array}")
print(f"F1: {np.mean(f1_array):.1%} (+-{np.std(f1_array):.1%})")
print(f"Accuracy: {np.mean(accuracy_array):.1%} (+-{np.std(accuracy_array):.1%})")
print(f"Time of one solution: {repeat_time_min:.0f}m {repeat_time_sec:.0f}s")

print(f" | {np.mean(f1_array):.1%} (+-{np.std(f1_array):.1%}) | {np.mean(accuracy_array):.1%} (+-{np.std(accuracy_array):.1%}) | {repeat_time_min:.0f}m {repeat_time_sec:.0f}s")

# Print hyperparameters for reminding what the final data is fore
print("Hyperparameters:")
pprint.pprint(h)

  from .autonotebook import tqdm as notebook_tqdm


Device: cpu
Number of CPU cores: 8
No GPU available
Hyperparameters:
{'balance': True,
 'batch_size': 256,
 'early_stopping_patience': inf,
 'fc1_size': 512,
 'image_size': 224,
 'lr': 0.001,
 'model': 'efficientnetv2',
 'num_epochs': 10,
 'scheduler': 'CosineAnnealingLR10'}
Running solution 1/10


Training epoch 1/10:   0%|          | 0/17 [00:00<?, ?mini-batch/s]