#Objective:
The goal of this assignment is to develop a deep learning model using DenseNet121 architecture to classify chest X-ray images.

<img src="https://miro.medium.com/v2/resize:fit:1400/format:webp/1*4kPpyvHv73ypzTvo6y8J9w.png" height=300px>

Read these articles before solving the assignment

Here are some useful resources to help you create a DenseNet-121 model from scratch in PyTorch:

1. Official DenseNet documentation-[Official DenseNet documentation](
https://pytorch.org/vision/master/models/densenet.html)


2. GitHub - [DenseNet Implementation](https://github.com/bamos/densenet.pytorch)

This GitHub repository contains a PyTorch implementation of DenseNet. The code is well-documented and can serve as a reference for building and training DenseNet models from scratch.
Check the repository here

3. DenseNet from scratch- [DenseNet Explained](https://www.geeksforgeeks.org/densenet-explained/)

4. [A nice article on DenseNet by our intern](https://medium.com/deepkapha-notes/implementing-densenet-121-in-pytorch-a-step-by-step-guide-c0c2625c2a60)


## Dataset:
Use the Chest X-ray dataset (Pneumonia vs Normal) provided in the folder.


Part 1: Data Preparation and Exploration
Data Loading and Preprocessing:

Load the Chest X-ray dataset.
Perform data augmentation (e.g., rotation, zoom, horizontal flip) to increase the variability of the training data.
Normalize the image pixel values.
Data Visualization:

Visualize a few examples from each class (Normal and Pneumonia) to understand the dataset.
Splitting the Dataset:

Split the dataset into training, validation, and test sets.


In [None]:
from google.colab import drive
import os
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [None]:
!cd '/content/drive/'
!ls

drive  sample_data


In [None]:
os.chdir('/content/drive/MyDrive/Week3/chest_xray')

In [None]:
!ls

test  train  val


In [None]:
# Paths to the dataset
train_dir = 'train'
val_dir = 'val'
test_dir = 'test'

In [None]:
def create_transforms(is_train=True): #[1 point]
  '''Function for data augmentation and normalization
  use transform like resize, randomRotation, randomResizedCrop, RandomHorizontalFlip. Make sure to differentiate between training and test samples'''
  if is_train:
        # Data augmentation and normalization for training
        transform = transforms.Compose([
            transforms.Resize((256, 256)),  # Resize images to a fixed size
            transforms.RandomResizedCrop(224),  # Randomly crop the image
            transforms.RandomRotation(15),  # Randomly rotate the image
            transforms.RandomHorizontalFlip(),  # Randomly flip the image horizontally
            transforms.ToTensor(),  # Convert image to tensor
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Normalize
        ])
  else:
        # Only normalization and resizing for validation/test
        transform = transforms.Compose([
            transforms.Resize((224, 224)),  # Resize images to a fixed size
            transforms.ToTensor(),  # Convert image to tensor
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Normalize
        ])
  return transform


In [None]:
from torchvision import datasets
from torch.utils.data import DataLoader
from torchvision import transforms
import torch
# Load and preprocess the data
train_transforms = create_transforms(is_train=True)
val_transforms = create_transforms(is_train=False)
test_transforms = create_transforms(is_train=False)

train_dataset = datasets.ImageFolder(train_dir, transform=train_transforms)
val_dataset = datasets.ImageFolder(val_dir, transform=val_transforms)
test_dataset = datasets.ImageFolder(test_dir, transform=test_transforms)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
def plot_images(dataset, class_names, num_images=5): #[1 point]
  """
    Visualize a few examples from the dataset along with their class labels.

    Args:
    - dataset (torchvision.datasets.ImageFolder): The dataset from which to sample images.
    - class_names (list of str): List of class names corresponding to the dataset classes.
    - num_images (int): Number of images to display.
    """
    # Get a random sample of images and labels
  indices = np.random.choice(len(dataset), num_images, replace=False)
  images, labels = zip(*[dataset[i] for i in indices])

    # Set up the plot
  fig, axes = plt.subplots(1, num_images, figsize=(15, 5))

  for i, (img, label) in enumerate(zip(images, labels)):
    img = img.permute(1, 2, 0)  # Rearrange dimensions to HWC for plotting
    img = img * torch.tensor([0.229, 0.224, 0.225]) + torch.tensor([0.485, 0.456, 0.406])  # Unnormalize
    img = np.clip(img, 0, 1)  # Ensure pixel values are in range [0, 1]

    axes[i].imshow(img)
    axes[i].set_title(class_names[label])
    axes[i].axis('off')

    plt.show()

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

 # 2. Construct the Model
 <br>
 The DenseNet class is constructed using the Bottleneck and Transition layers. The densenet121 function creates an instance of the DenseNet-121 model.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class Bottleneck(nn.Module): # [2 points]
  ''' Fill in the None '''
  def __init__(self, in_channels, growth_rate):
        super(Bottleneck, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_channels)  # Batch normalization
        self.conv1 = nn.Conv2d(in_channels, 4 * growth_rate, kernel_size=1, bias=False)  # 1x1 convolution
        self.bn2 = nn.BatchNorm2d(4 * growth_rate)  # Batch normalization
        self.conv2 = nn.Conv2d(4 * growth_rate, growth_rate, kernel_size=3, padding=1, bias=False)  # 3x3 convolution

  def forward(self, x):
        out = self.conv1(F.relu(self.bn1(x)))  # Apply batch norm, ReLU, and 1x1 convolution

        # Apply batch norm, ReLU, and 1x1 convolution
        out = self.conv2(F.relu(self.bn2(out)))
        # Apply batch norm, ReLU, and 3x3 convolution
        out = torch.cat([out, x], 1)
        # Concatenate input and output along channel dimension
        return out

In [None]:
# Define the transition layer used in DenseNet
class Transition(nn.Module): #[2 points]
    def __init__(self, in_channels, out_channels):
        super(Transition, self).__init__()
        self.bn = nn.BatchNorm2d(in_channels)  # Batch normalization
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False)  # 1x1 convolution
        self.pool = nn.AvgPool2d(kernel_size=2, stride=2)  # Average pooling

    def forward(self, x):
        out = self.conv(F.relu(self.bn(x)))  # Apply batch norm, ReLU, and 1x1 convolution
        out = self.pool(out)  # Apply average pooling
        return out

In [None]:
# Define the DenseNet architecture
class DenseNet(nn.Module):
    def __init__(self, block, nblocks, growth_rate=32, reduction=0.5, num_classes=1000):
        super(DenseNet, self).__init__()
        self.growth_rate = growth_rate  # Growth rate for DenseNet

        num_planes = 2 * growth_rate  # Initial number of filters
        self.conv1 = nn.Conv2d(3, num_planes, kernel_size=7, stride=2, padding=3, bias=False)  # Initial 7x7 convolution
        self.bn1 = nn.BatchNorm2d(num_planes)  # Batch normalization
        self.pool1 = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)  # Max pooling

        # Create the first dense block and transition layer
        self.dense1 = self._make_dense_layers(block, num_planes, nblocks[0])
        num_planes += nblocks[0] * growth_rate
        out_planes = int(math.floor(num_planes * reduction))
        self.trans1 = Transition(num_planes, out_planes)
        num_planes = out_planes

        # Create the second dense block and transition layer
        self.dense2 = self._make_dense_layers(block, num_planes, nblocks[1])
        num_planes += nblocks[1] * growth_rate
        out_planes = int(math.floor(num_planes * reduction))
        self.trans2 = Transition(num_planes, out_planes)
        num_planes = out_planes

        # Create the third dense block and transition layer
        self.dense3 = self._make_dense_layers(block, num_planes, nblocks[2])
        num_planes += nblocks[2] * growth_rate
        out_planes = int(math.floor(num_planes * reduction))
        self.trans3 = Transition(num_planes, out_planes)
        num_planes = out_planes

        # Create the fourth dense block
        self.dense4 = self._make_dense_layers(block, num_planes, nblocks[3])
        num_planes += nblocks[3] * growth_rate

        self.bn2 = nn.BatchNorm2d(num_planes)  # Batch normalization
        self.linear = nn.Linear(num_planes, num_classes)  # Fully connected layer

    def _make_dense_layers(self, block, in_channels, nblocks):
        layers = []
        for i in range(nblocks):
            layers.append(block(in_channels, self.growth_rate))  # Append bottleneck blocks
            in_channels += self.growth_rate
        return nn.Sequential(*layers)  # Return a sequential container of layers

    def forward(self, x):
        out = self.pool1(F.relu(self.bn1(self.conv1(x))))  # Initial convolution, batch norm, ReLU, and max pool
        out = self.trans1(self.dense1(out))  # First dense block and transition layer
        out = self.trans2(self.dense2(out))  # Second dense block and transition layer
        out = self.trans3(self.dense3(out))  # Third dense block and transition layer
        out = self.dense4(out)  # Fourth dense block
        out = F.relu(self.bn2(out))  # Apply batch norm and ReLU
        out = F.avg_pool2d(out, 4)  # Apply average pool
        out = out.view(out.size(0), -1)  # Flatten the output
        out = self.linear(out)  # Fully connected layer
        return out

In [None]:
def densenet121():
    return DenseNet(Bottleneck, [6, 12, 24, 16])


In [None]:
# Example of creating the model
model = densenet121()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
print(model)

DenseNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool1): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (dense1): Sequential(
    (0): Bottleneck(
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv1): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(128, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    )
    (1): Bottleneck(
      (bn1): BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv1): Conv2d(96, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(128, 32, 

# 3. Model Compiltion and training

In [None]:
### Model Compilation
#[1 point]
# Define the loss function and optimizer

import torch.optim as optim
criterion = nn.CrossEntropyLoss()  # Cross-entropy loss for multi-class classification
optimizer = optim.Adam(model.parameters(), lr=0.001)  # Adam optimizer with learning rate 0.001


In [None]:
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10): # [1 point]
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
           for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)  # Move inputs and labels to the device
            optimizer.zero_grad()  # Zero the parameter gradients
            outputs = model(inputs)  # Forward pass
            loss = criterion(outputs, labels)  # Compute the loss
            loss.backward()  # Backward pass (compute gradients)
            optimizer.step()  # Update model parameters

            running_loss += loss.item() * inputs.size(0)  # Accumulate loss

        # Calculate average training loss for the epoch
        epoch_loss = running_loss / len(train_loader.dataset)

        # Evaluate on validation set
        val_loss = evaluate_model(model, criterion, val_loader)

        # Print epoch statistics
        print(f'Epoch {epoch + 1}/{num_epochs}, Training Loss: {epoch_loss:.4f}, Validation Loss: {val_loss:.4f}')


In [None]:
def evaluate_model(model, criterion, data_loader): # [1 point]
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item() * inputs.size(0)
    return running_loss / len(data_loader.dataset)

# 4. Hyperparameter Tuning

In [None]:
### Hyperparameter Tuning

learning_rates = [0.001, 0.0001]
num_epochs_list = [10, 20]
best_accuracy = 0
best_hyperparams = {}

for lr in learning_rates:
    for num_epochs in num_epochs_list:
        print(f"Training with learning rate: {lr} and number of epochs: {num_epochs}")

        # Model, criterion, optimizer
        model = densenet121().to(device)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=lr)

        def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs):
            for epoch in range(num_epochs):
                model.train()
                running_loss = 0.0
                for inputs, labels in train_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    optimizer.zero_grad()
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    loss.backward()
                    optimizer.step()
                    running_loss += loss.item() * inputs.size(0)

                epoch_loss = running_loss / len(train_loader.dataset)
                val_loss, val_acc = evaluate_model(model, criterion, val_loader)
                print(f'Epoch {epoch+1}/{num_epochs}, Training Loss: {epoch_loss:.4f}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.4f}')

                # Save best hyperparameters
                if val_acc > best_accuracy:
                    best_accuracy = val_acc
                    best_hyperparams = {'learning_rate': lr, 'num_epochs': num_epochs}

        def evaluate_model(model, criterion, data_loader):
            model.eval()
            running_loss = 0.0
            correct = 0
            total = 0
            with torch.no_grad():
                for inputs, labels in data_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    running_loss += loss.item() * inputs.size(0)
                    _, predicted = torch.max(outputs, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()
            accuracy = correct / total
            return running_loss / len(data_loader.dataset), accuracy

        train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs)
print(f"Best Hyperparameters: {best_hyperparams}")

Training with learning rate: 0.001 and number of epochs: 10


TypeError: conv2d() received an invalid combination of arguments - got (NoneType, Parameter, NoneType, tuple, tuple, tuple, int), but expected one of:
 * (Tensor input, Tensor weight, Tensor bias = None, tuple of ints stride = 1, tuple of ints padding = 0, tuple of ints dilation = 1, int groups = 1)
      didn't match because some of the arguments have invalid types: (!NoneType!, !Parameter!, !NoneType!, !tuple of (int, int)!, !tuple of (int, int)!, !tuple of (int, int)!, !int!)
 * (Tensor input, Tensor weight, Tensor bias = None, tuple of ints stride = 1, str padding = "valid", tuple of ints dilation = 1, int groups = 1)
      didn't match because some of the arguments have invalid types: (!NoneType!, !Parameter!, !NoneType!, !tuple of (int, int)!, !tuple of (int, int)!, !tuple of (int, int)!, !int!)


In [None]:
model = densenet121().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=best_hyperparams['learning_rate'])

# Train the model with the best hyperparameters [1 point]
train_model(model, criterion, optimizer, train_loader, val_loader, best_hyperparams['num_epochs'])


In [None]:
def test_model(model, test_loader): #[1 points]
    """
    Evaluate the model on the test dataset and return the true and predicted labels.

    Args:
    - model: The trained PyTorch model to be evaluated.
    - test_loader: DataLoader for the test dataset.

    Returns:
    - y_true: Numpy array of true labels.
    - y_pred: Numpy array of predicted labels.
    """
    # Set the model to evaluation mode
    model.eval()

    # Lists to store true labels and predictions
    y_true = []
    y_pred = []

    # Disable gradient calculation for inference
    with torch.no_grad():
        # Iterate over the test data loader
        for inputs, labels in test_loader:
            # Move the inputs and labels to the appropriate device (GPU/CPU)
          inputs, labels = inputs.to(model.device), labels.to(model.device)

            # Forward pass: compute the model outputs
        outputs = model(inputs)

            # Get the predicted class by finding the max log-probability
        preds = torch.max(outputs, 1)

            # Append true labels and predictions to the respective lists
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(preds.cpu().numpy())


    # Convert lists to numpy arrays and return
    return np.array(y_true), np.array(y_pred)

# Evaluate the model on the test dataset
model = trained_model
y_true, y_pred = test_model(model, test_loader)

# Calculate the test accuracy
test_acc = np.mean(y_pred == y_true)
print(f'Test Accuracy: {test_acc:.2f}')

In [None]:
### Confusion Matrix

sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

### ROC Curve ###

# Assuming y_true is the true class labels and you already have probabilities from model output
# y_pred_prob needs to be predicted probabilities, not class labels (like from a softmax layer)

# For binary classification, we extract the probability for class 1
y_pred_prob = torch.tensor(predicted_probabilities).cpu().numpy()[:, 1]  # Assuming probabilities from model output

# Compute ROC curve and AUC
fpr, tpr, _ = roc_curve(y_true, y_pred_prob)
roc_auc = auc(fpr, tpr)

# Plot ROC curve
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()

# 5. Save the model

In [None]:
def save_model(model, path='model.pth'):
    """
    Save the PyTorch model to a file.

    Args:
    - model: The trained PyTorch model to be saved.
    - path: The file path where the model will be saved.
    """
    torch.save(model.state_dict(), path)
    print(f"Model saved to {path}")

# Save the trained model
save_model(model, 'best_densenet_model.pth')