
    # Pneumonia Detection Using Machine Learning and Deep Learning

    This notebook contains the complete pipeline for training a deep learning model to classify DICOM images into 'NORMAL' and 'PNEUMONIA' categories.
    
    ## Steps:
    - Preprocessing DICOM images
    - Loading models (CNN, ViT)
    - Training and evaluating the models
    - Dataset Insights
    

In [None]:

    # Import necessary libraries
    import os
    import pydicom
    import torch
    from torch.utils.data import Dataset, DataLoader
    import cv2
    import torch.optim as optim
    from sklearn.metrics import classification_report
    import torch.nn as nn
    import torchvision.models as models
    from PIL import Image
    import numpy as np
    import matplotlib.pyplot as plt
    from transformers import ViTForImageClassification, ViTFeatureExtractor

    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Define hyperparameters
    batch_size = 32
    learning_rate = 0.001
    num_epochs = 10
    num_features = 2
    

In [None]:
class PneumoniaDataset(Dataset):
    def __init__(self, data_dir, labels, transform=None, img_size=128):
        self.data_dir = data_dir
        self.labels = labels
        self.transform = transform
        self.img_size = img_size
        self.data = []

        # Process the data image paths and add labels to them
        for label in labels:
            label_dir = os.path.join(data_dir, label)
            class_idx = labels.index(label)
            print(label_dir)

            # Create image directories
            for img_file in os.listdir(label_dir):
                img_path = os.path.join(label_dir, img_file)
                self.data.append((img_path, class_idx))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, label = self.data[idx]

        try:
            # Attempt to read as a DICOM file
            dicom = pydicom.dcmread(img_path)
            img = dicom.pixel_array
        except Exception as e:
            # If DICOM read fails, fallback to standard image reading
            img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
            if img is None:
                raise ValueError(f"Failed to read image file {img_path}: {e}")

        # Resize and normalize
        resized_image = cv2.resize(img, (self.img_size, self.img_size))
        img_normalized = resized_image / 255.0

        # Convert to 3 channels by repeating the single channel for grayscale images
        img_rgb = np.stack([img_normalized] * 3, axis=-1)  # Convert to 3-channel RGB image

        # Convert to tensor
        img_tensor = torch.tensor(img_rgb).permute(2, 0, 1).float()

        if self.transform:
            img_tensor = self.transform(img_tensor)

        return img_tensor, label


In [None]:
def get_dataloaders(data_dir, labels, batch_size):
    train_dataset = PneumoniaDataset(data_dir=os.path.join(data_dir, 'train'), labels=labels)
    test_dataset = PneumoniaDataset(data_dir=os.path.join(data_dir, 'test'), labels=labels)
    val_dataset = PneumoniaDataset(data_dir=os.path.join(data_dir, 'val'), labels=labels)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, test_loader, val_loader


In [None]:
def dataset_insights(data_dir):
    """
    Function to print insights about the dataset like the number of images in each class (NORMAL and PNEUMONIA).
    """
    labels = ['NORMAL', 'PNEUMONIA']
    
    for label in labels:
        label_dir = os.path.join(data_dir, 'train', label)
        num_images = len(os.listdir(label_dir))
        print(f"Number of {label} images in train set: {num_images}")
    
    for label in labels:
        label_dir = os.path.join(data_dir, 'test', label)
        num_images = len(os.listdir(label_dir))
        print(f"Number of {label} images in test set: {num_images}")
    
    for label in labels:
        label_dir = os.path.join(data_dir, 'val', label)
        num_images = len(os.listdir(label_dir))
        print(f"Number of {label} images in validation set: {num_images}")


In [None]:

        # Get insights about the dataset
        dataset_insights("/kaggle/input/chest-xray-pneumonia/chest_xray")
        

In [None]:
def initiate_model(model_type='resnet101'):
    """
    Initialize a model with random weights.

    Parameters:
    - model_type: str, type of model to initialize (e.g., 'resnet101')
    
    Returns:
    - model: Initialized model.
    """
    if model_type == 'resnet101':
        model = models.resnet101(weights=None)  # No pre-trained weights
    elif model_type == 'resnet50':
        model = models.resnet50(weights=None)  # No pre-trained weights
    else:
        model = models.resnet18(weights=None)  # No pre-trained weights

    # Modify the last fully connected layer to output 2 features (Pneumonia/Normal)
    model.fc = nn.Linear(model.fc.in_features, 2)

    return model


In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device):
    """
    Trains the model using the provided data and optimizes its parameters.

    Parameters:
    - model: The PyTorch model to train.
    - train_loader: DataLoader for training data.
    - val_loader: DataLoader for validation data.
    - criterion: Loss function.
    - optimizer: Optimizer for model parameters.
    - num_epochs: Number of epochs for training.
    - device: Device to run the model on ('cuda' or 'cpu').
    """
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        running_loss, running_corrects = 0.0, 0.0

        for imgs, labels in train_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * imgs.size(0)
            running_corrects += torch.sum(preds == labels.data).item()

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects / len(train_loader.dataset)

        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}')
        
        # Validation at the end of each epoch
        validate_model(model, val_loader, criterion, device)


In [None]:
def validate_model(model, val_loader, criterion, device):
    """
    Validates the model on the validation set.
    
    Parameters:
    - model: PyTorch model.
    - val_loader: DataLoader for validation data.
    - criterion: Loss function.
    - device: Device to run the model on ('cuda' or 'cpu').
    """
    model.eval()
    running_loss, running_corrects = 0.0, 0.0

    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * imgs.size(0)
            running_corrects += torch.sum(preds == labels.data).item()

    epoch_loss = running_loss / len(val_loader.dataset)
    epoch_acc = running_corrects / len(val_loader.dataset)

    print(f'Validation Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}')


In [None]:
def save_model(model, path):
    """
    Saves the model to the specified path.
    
    Parameters:
    - model: PyTorch model.
    - path: str, path where the model should be saved.
    """
    torch.save(model.state_dict(), path)


In [None]:
def process(action, model, model_type, train_loader, val_loader, test_loader, criterion, optimizer, device, num_epochs):
    """
    Handles the training and evaluation processes based on the action parameter.

    Parameters:
    - action: str, 'train' or 'evaluate'.
    - model: PyTorch model to be trained or evaluated.
    - model_type: str, type of model (e.g., 'resnet101').
    - train_loader: DataLoader for training data.
    - val_loader: DataLoader for validation data.
    - test_loader: DataLoader for test data.
    - criterion: Loss function.
    - optimizer: Optimizer for model parameters.
    - device: Device to run the model on ('cuda' or 'cpu').
    - num_epochs: Number of epochs for training.
    """
    if action == 'train':
        # Start the training process
        print(f"Starting training for {model_type}...")
        train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device)
        
        # Save the model after training
        model_save_path = f"/kaggle/working/{model_type}_model.pth"
        print(f"Training completed. Saving model to {model_save_path}...")
        save_model(model, model_save_path)
        print(f"Model saved successfully to {model_save_path}.")

    elif action == 'evaluate':
        # Evaluate the model on the test data
        print(f"Evaluating {model_type} on the test set...")
        validate_model(model, test_loader, criterion, device)
        print(f"Evaluation completed for {model_type}.")


In [None]:

        model = initiate_model('resnet101')

        # Define loss function and optimizer
        criterion = torch.nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)

        # Load the dataset
        train_loader, test_loader, val_loader = get_dataloaders('/kaggle/input/chest-xray-pneumonia/chest_xray', ['NORMAL', 'PNEUMONIA'], batch_size)

        # Train the model
        process('train', model, 'resnet101', train_loader, val_loader, test_loader, criterion, optimizer, device, num_epochs)
        

In [None]:
def validate_model(model, val_loader, criterion, device):
    """
    Validates the model on the validation set.
    
    Parameters:
    - model: PyTorch model.
    - val_loader: DataLoader for validation data.
    - criterion: Loss function.
    - device: Device to run the model on ('cuda' or 'cpu').
    """
    model.eval()
    running_loss, running_corrects = 0.0, 0.0

    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * imgs.size(0)
            running_corrects += torch.sum(preds == labels.data).item()

    epoch_loss = running_loss / len(val_loader.dataset)
    epoch_acc = running_corrects / len(val_loader.dataset)

    print(f'Validation Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}')
