<a href="https://colab.research.google.com/github/Amrutha0610/Crowd_Density_Estimation/blob/main/Crowd_density_estimation_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Crowd Counting and Density Estimation

## CSRNet model respository cloning from github

# Initial Setup

This section handles mounting Google Drive to access files, changing the current directory to the project's working directory, cloning the necessary GitHub repository for the CSRNet model, and installing the required Python packages to run the code.

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Change to working directory
%cd /content/drive/MyDrive/crowd_density
# check files
!ls

# Clone CSRNet repository
!git clone https://github.com/leeyeehoo/CSRNet-pytorch.git
%cd CSRNet-pytorch

# Install required packages
!pip install h5py opencv-python scipy pillow


### Dataset and Model Paths

This cell defines the file paths for the training and testing datasets, as well as the paths for saving the trained models in Google Drive.

In [None]:
# Dataset directories in Google Drive
train_image_path = '/content/drive/MyDrive/crowd_density/ShanghaiTech_Crowd_Counting_Dataset/part_B_final/train_data/images'
train_density_path = '/content/drive/MyDrive/crowd_density/ShanghaiTech_Crowd_Counting_Dataset/part_B_final/train_data/ground_truth'

test_image_path = '/content/drive/MyDrive/crowd_density/ShanghaiTech_Crowd_Counting_Dataset/part_B_final/test_data/images'
test_density_path = '/content/drive/MyDrive/crowd_density/ShanghaiTech_Crowd_Counting_Dataset/part_B_final/test_data/ground_truth'

# Model paths
#partA_model_path = '/content/drive/MyDrive/crowd_density/PartAmodel_best.pth.tar'
#partB_model_path = '/content/drive/MyDrive/crowd_density/partBmodel_best.pth.tar'



In [None]:
# Automatically replace all "xrange" with "range" in model.py
!sed -i 's/xrange/range/g' /content/drive/MyDrive/crowd_density/CSRNet-pytorch/model.py

## PyTorch dataset pipeline
CrowdDataset class: Loads and processes crowd counting images and their corresponding density maps. Supports random horizontal flipping as data augmentation.

custom_collate_fn function: Ensures all images and density maps in a batch are resized to the same dimensions for proper batch training.

DataLoaders: Prepares batches of training and test data with correct transformations and collate functions.

In [None]:
import os
import numpy as np
from scipy.io import loadmat
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from scipy.ndimage import gaussian_filter
import random

class CrowdDataset(Dataset):
    """
    PyTorch Dataset for crowd counting.
    Loads images and corresponding density maps, applies preprocessing and optional data augmentation.

    Args:
        image_dir (str): Directory path containing image files.
        density_dir (str): Directory path containing corresponding density map (.mat) files.
        transform (callable, optional): Transformations to apply to input images.
        augment (bool, optional): Whether to apply data augmentation.

    Returns:
        Tuple[Tensor, Tensor]: Transformed image and corresponding density map tensor.
    """

    def __init__(self, image_dir: str, density_dir: str, transform=None, augment: bool = False):
        self.image_dir = image_dir
        self.density_dir = density_dir
        self.image_files = [f for f in os.listdir(image_dir) if f.endswith('.jpg')]
        self.transform = transform
        self.augment = augment

    def __len__(self) -> int:
        """Returns the total number of images in the dataset."""
        return len(self.image_files)

    def __getitem__(self, idx: int):
        """Fetches the image and corresponding density map at the specified index."""

        image_file = self.image_files[idx]
        image_path = os.path.join(self.image_dir, image_file)

        # Load corresponding ground truth density map (.mat file)
        density_file = 'GT_' + image_file.replace('.jpg', '.mat')
        density_path = os.path.join(self.density_dir, density_file)

        # Load image and convert to RGB
        image = Image.open(image_path).convert('RGB')
        width, height = image.size

        # Load ground truth points from the .mat file
        mat = loadmat(density_path)
        points = mat["image_info"][0, 0][0, 0][0]

        # Create a blank density map and place 1 at each annotated point
        density_map = np.zeros((height, width), dtype=np.float32)
        for point in points:
            x = min(int(point[0]), width - 1)
            y = min(int(point[1]), height - 1)
            density_map[y, x] = 1

        # Apply Gaussian smoothing to convert points to a density map
        density_map = gaussian_filter(density_map, sigma=15)

        # Convert density map to PIL Image for possible transformations
        density_map_img = Image.fromarray(density_map)

        if self.augment:
            # Apply random horizontal flip
            if random.random() > 0.5:
                image = transforms.functional.hflip(image)
                density_map_img = transforms.functional.hflip(density_map_img)

        if self.transform:
            image = self.transform(image)

        # Convert the density map to a torch tensor after augmentation
        density_map = torch.from_numpy(np.array(density_map_img)).unsqueeze(0).float()

        return image, density_map


# Data preprocessing transformations
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])


def custom_collate_fn(batch):
    """
    Custom collate function to resize both images and density maps to a fixed size for batch processing.

    Args:
        batch (list of tuples): Each tuple contains an image tensor and its corresponding density map tensor.

    Returns:
        Tuple[Tensor, Tensor]: Batched and resized image and density map tensors.
    """

    images = [item[0] for item in batch]
    density_maps = [item[1] for item in batch]

    # Set target size for resizing
    target_height, target_width = 512, 512

    resized_images = []
    resized_density_maps = []

    for img, density_map in zip(images, density_maps):
        # Resize image to target size
        resized_img = transforms.functional.resize(img, (target_height, target_width))
        resized_images.append(resized_img)

        # Resize density map using bilinear interpolation (preserves smoothness)
        resized_density_map = transforms.functional.resize(
            density_map,
            (target_height, target_width),
            interpolation=transforms.InterpolationMode.BILINEAR
        )

        resized_density_maps.append(resized_density_map)

    # Stack resized images and density maps into batches
    images = torch.stack(resized_images, 0)
    density_maps = torch.stack(resized_density_maps, 0)

    return images, density_maps


# Define DataLoaders for training and testing
train_dataset = CrowdDataset(train_image_path, train_density_path, transform=transform, augment=True)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=custom_collate_fn)

test_dataset = CrowdDataset(test_image_path, test_density_path, transform=transform, augment=False)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=custom_collate_fn)


### Install optuna for hyperparameter tuning

In [None]:
!pip install optuna

Output for SGD optimizer

In [None]:
import torch

# Define the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

## CSRNet Architecture for Crowd Counting with Pretrained VGG-16 Frontend and Dilated Backend

The make_layers function builds convolutional layers based on the VGG-16 structure. The frontend uses pretrained VGG-16 layers for feature extraction, while the backend uses dilated convolutions to preserve spatial detail. The output layer generates a single-channel density map, with backend and output weights initialized randomly.

In [None]:
import torch.nn as nn
import torchvision.models as models

def make_layers(cfg: list, in_channels: int = 3, batch_norm: bool = False, dilation: bool = False) -> nn.Sequential:
    """
    Builds a sequence of convolutional and pooling layers based on the provided configuration.

    Args:
        cfg (list): List of layer specifications, integers indicate number of filters, 'M' indicates MaxPooling.
        in_channels (int): Number of input channels. Default is 3 for RGB images.
        batch_norm (bool): Unused here, placeholder for batch normalization support.
        dilation (bool): Whether to use dilated convolutions in the backend.

    Returns:
        nn.Sequential: Sequential model consisting of the specified layers.
    """
    d_rate = 2 if dilation else 1  # Set dilation rate
    layers = []

    for v in cfg:
        if v == 'M':
            # Add MaxPooling layer
            layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
        else:
            # Add convolutional layer with optional dilation
            conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=d_rate, dilation=d_rate)
            layers += [conv2d, nn.ReLU(inplace=True)]  # Add ReLU activation
            in_channels = v  # Update input channels for next layer

    return nn.Sequential(*layers)

class CSRNet(nn.Module):
    """
    CSRNet: A convolutional neural network for crowd counting using a VGG-16 based frontend.

    The model consists of:
    - A frontend based on VGG-16 layers for feature extraction.
    - A backend using dilated convolutions for preserving spatial information.
    - An output layer that produces the final density map.

    The frontend is initialized with pretrained VGG-16 weights to leverage transfer learning.
    """

    def __init__(self):
        super(CSRNet, self).__init__()

        # Configuration for frontend and backend layers
        self.frontend_feat = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512]
        self.backend_feat = [512, 512, 512, 256, 128, 64]

        # Build frontend and backend
        self.frontend = make_layers(self.frontend_feat)
        self.backend = make_layers(self.backend_feat, in_channels=512, dilation=True)

        # Final output layer to produce the density map (single-channel output)
        self.output_layer = nn.Conv2d(64, 1, kernel_size=1)

        # Load pretrained VGG-16 weights for frontend
        vgg = models.vgg16(pretrained=True)

        self._initialize_weights()  # Initialize weights for backend and output layer

        # Transfer VGG-16 weights to frontend layers
        frontend_items = list(self.frontend.state_dict().items())
        vgg_items = list(vgg.state_dict().items())
        for i in range(len(frontend_items)):
            frontend_items[i][1].data[:] = vgg_items[i][1].data[:]

    def forward(self, x):
        """
        Forward pass through the CSRNet model.

        Args:
            x (Tensor): Input image tensor.

        Returns:
            Tensor: Predicted density map.
        """
        x = self.frontend(x)        # Feature extraction
        x = self.backend(x)         # Contextual refinement using dilated convolutions
        x = self.output_layer(x)    # Density map prediction
        return x

    def _initialize_weights(self):
        """
        Initializes the weights of the backend and output layer using a normal distribution.
        """
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.normal_(m.weight, std=0.01)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)


This function evaluates the trained model by calculating the Mean Absolute Error (MAE) and Root Mean Squared Error (RMSE) between the predicted and actual crowd counts. It sums the total error across all batches and averages it. The function returns MAE and RMSE as performance metrics for the model.

In [None]:
def evaluate_model(model: torch.nn.Module, data_loader: torch.utils.data.DataLoader) -> tuple:
    """
    Evaluates the performance of the model on a given dataset using MAE and RMSE metrics.

    Args:
        model (torch.nn.Module): The trained crowd counting model.
        data_loader (torch.utils.data.DataLoader): DataLoader for the validation or test dataset.

    Returns:
        tuple: Mean Absolute Error (MAE) and Root Mean Squared Error (RMSE) for the dataset.
    """
    model.eval()  # Set model to evaluation mode
    mae = 0.0  # Initialize Mean Absolute Error
    mse = 0.0  # Initialize Mean Squared Error

    with torch.no_grad():  # Disable gradient tracking for evaluation
        for images, densities in data_loader:
            images = images.to(device)
            densities = densities.to(device)

            outputs = model(images)

            # Accumulate the absolute and squared differences between predicted and actual counts
            mae += torch.abs(outputs.sum() - densities.sum()).item()
            mse += ((outputs.sum() - densities.sum()) ** 2).item()

    # Calculate average MAE and RMSE
    mae /= len(data_loader)
    rmse = (mse / len(data_loader)) ** 0.5

    return mae, rmse


## Model Training
This function trains the CSRNet model using either SGD or Adam optimizers based on the trial’s hyperparameters. It logs training loss, MAE, and RMSE to TensorBoard while handling NaN losses safely. The best-performing model (with the lowest MAE) is saved and returned after validation.

In [None]:
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
import torch
from torch.utils.data import DataLoader
from typing import Tuple

def get_writer(trial_number: int) -> SummaryWriter:
    """
    Initializes and returns a TensorBoard SummaryWriter for the given trial.

    Args:
        trial_number (int): The trial number for logging.

    Returns:
        SummaryWriter: TensorBoard writer object.
    """
    log_dir = f'/content/drive/MyDrive/crowd_density/runs/trial_{trial_number}'  # Save logs to Google Drive
    writer = SummaryWriter(log_dir=log_dir)
    return writer


def train_model(train_loader: DataLoader, val_loader: DataLoader, lr: float, optimizer_name: str, num_epochs: int, trial_number: int) -> Tuple[float, torch.nn.Module]:
    """
    Trains the CSRNet model using the specified hyperparameters and logs metrics to TensorBoard.

    Args:
        train_loader (DataLoader): DataLoader for the training dataset.
        val_loader (DataLoader): DataLoader for the validation dataset.
        lr (float): Learning rate.
        optimizer_name (str): Optimizer to use ('SGD' or 'Adam').
        num_epochs (int): Number of training epochs.
        trial_number (int): Current trial number for tracking experiments.

    Returns:
        Tuple[float, torch.nn.Module]: The best MAE achieved and the trained model.
    """
    model = CSRNet().to(device)
    criterion = torch.nn.MSELoss(reduction='sum')  # Sum reduction to calculate total density error

    # Select optimizer based on trial parameters
    if optimizer_name == 'SGD':
        optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.95)
    else:  # Use Adam optimizer
        optimizer = optim.Adam(model.parameters(), lr=lr)

    # Initial evaluation to set the starting best MAE
    initial_mae, _ = evaluate_model(model, val_loader)
    best_mae = initial_mae
    best_model_state_dict = model.state_dict().copy()  # Save initial model state

    writer = get_writer(trial_number)  # TensorBoard writer

    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0.0  # Track cumulative loss for the epoch

        for batch_idx, (images, densities) in enumerate(train_loader):
            images = images.to(device)
            densities = densities.to(device)

            outputs = model(images)

            # Resize outputs to match density map size
            outputs_resized = F.interpolate(outputs, size=densities.shape[2:], mode='bilinear', align_corners=False)

            loss = criterion(outputs_resized, densities)

            # Handle NaN losses to avoid breaking training
            if not torch.isnan(loss):
                optimizer.zero_grad()
                loss.backward()
                #torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=5)
                optimizer.step()
                epoch_loss += loss.item()
            else:
                print(f"Warning: NaN loss encountered in Trial {trial_number}, Epoch {epoch + 1}, Batch {batch_idx}. Skipping backpropagation.")

        # Evaluate the model on validation data after each epoch
        mae, rmse = evaluate_model(model, val_loader)

        print(f'Trial {trial_number}, Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.2f}, MAE: {mae:.2f}, RMSE: {rmse:.2f}')

        # Log training metrics to TensorBoard
        writer.add_scalar('Loss/train', epoch_loss, epoch)
        writer.add_scalar('MAE/val', mae, epoch)
        writer.add_scalar('RMSE/val', rmse, epoch)

        # Save the best model based on lowest MAE
        if mae < best_mae:
            best_mae = mae
            best_model_state_dict = model.state_dict().copy()

    writer.close()

    # Load best model weights before returning
    model.load_state_dict(best_model_state_dict)
    return best_mae, model



## Optuna objective
This function defines the Optuna objective, where learning rate and optimizer type are sampled to find the best hyperparameters for training. It trains the CSRNet model, saves the best model, evaluates it on the test set, and logs all trial details for future tracking. The returned MAE guides Optuna in minimizing the crowd counting error.

In [None]:
import optuna
import torch.optim as optim
import torch.nn.functional as F
import torch
from typing import Any

def objective(trial: optuna.trial.Trial) -> float:
    """
    Objective function for Optuna hyperparameter optimization.

    Args:
        trial (optuna.trial.Trial): An Optuna trial object for sampling hyperparameters.

    Returns:
        float: The Mean Absolute Error (MAE) on the test set, which Optuna will minimize.
    """
    # Suggest learning rate using log scale between 1e-7 and 1e-3
    lr = trial.suggest_float('lr', 1e-5, 1e-3, log=True) # need to change the learning rate to 1e-5 as the current learning rate may raise NaN loss

    # Only using Adam optimizer in this setup
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam'])

    # Set number of epochs for this trial
    num_epochs = 50  # Can be increased later for more robust training

    # Train model and retrieve best MAE and trained model for this trial
    best_mae_in_trial, trained_model = train_model(train_loader, test_loader, lr, optimizer_name, num_epochs, trial.number)

    # Save the best model for this trial to Google Drive
    best_model_path = f'/content/drive/MyDrive/crowd_density/best_model_trial_{trial.number}.pth'
    torch.save(trained_model.state_dict(), best_model_path)

    # Final evaluation on the test set
    mae, rmse = evaluate_model(trained_model, test_loader)

    # Log trial details for future reference
    with open('/content/drive/MyDrive/crowd_density/optuna_best_trials.txt', 'a') as f:
        f.write(f"Trial {trial.number}:\n")
        f.write(f"  Value (MAE): {mae:.2f}\n")
        f.write(f"  Params: {trial.params}\n")
        f.write(f"  Best MAE during training: {best_mae_in_trial:.2f}\n")
        f.write(f"  Model path: {best_model_path}\n\n")

    return mae  # Return MAE as the objective to minimize

## Optuna study for hyperparameter optimization
This block initializes an Optuna study to minimize the Mean Absolute Error (MAE) by tuning hyperparameters. It runs the specified number of trials and automatically finds the best combination of hyperparameters. After completion, it prints the best trial's MAE and the corresponding hyperparameters.



In [None]:
import optuna

# Create Optuna study for hyperparameter optimization
study: optuna.study.Study = optuna.create_study(direction='minimize')  # We aim to minimize MAE

# Run the optimization process with the specified number of trials
study.optimize(objective, n_trials=50, show_progress_bar=True)  # can increase n_trials for better tuning

# Display the best trial results
print("Best trial:")
print(f"  Value: {study.best_trial.value:.2f} (MAE)")  # Best MAE achieved across all trials
print("  Params: ")
for key, value in study.best_trial.params.items():
    print(f"    {key}: {value}")  # Display the best hyperparameters found

### Visualize results with TensorBoard

TensorBoard to visualize the training loss and validation MAE/RMSE for each trial. The logs are saved to Google Drive in the `crowd_density/runs` directory.

In [None]:
%load_ext tensorboard
%tensorboard --logdir /content/drive/MyDrive/crowd_density/runs

# Crowd Count Prediction and Density Map Visualization

This function loads a pre-trained CSRNet model and predicts the crowd count for a given input image. To prevent GPU memory overflow, the image is resized to a manageable size before inference. The function then visualizes both the original image and its corresponding predicted density map side-by-side, providing a clear comparison. The predicted crowd count is calculated by summing all the values in the density map, which serves as an approximation of the total number of people present in the image.

In [None]:
import matplotlib.pyplot as plt
from torchvision import transforms
from PIL import Image
import torch

def predict_image_with_density_map(image_path: str, model_path: str) -> int:
    """
    Loads a trained CSRNet model, performs inference on a given image, and visualizes the predicted density map.

    Args:
        image_path (str): Path to the input image.
        model_path (str): Path to the saved model weights.

    Returns:
        int: The predicted crowd count in the input image.
    """

    # Load the trained model
    model = CSRNet().to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()  # Set model to evaluation mode

    # Load and prepare the input image
    img = Image.open(image_path).convert('RGB')

    # Resize image to match the model’s expected input size to avoid GPU memory issues
    target_height, target_width = 512, 512
    resize_transform = transforms.Compose([
        transforms.Resize((target_height, target_width)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    # Apply transformations and add batch dimension
    img_transformed = resize_transform(img).unsqueeze(0).to(device)

    # Forward pass (inference) without gradient computation
    with torch.no_grad():
        output = model(img_transformed)

    # The output is a density map with smaller spatial dimensions
    predicted_count = int(output.sum().item())  # Sum of density map = estimated crowd count

    # Convert the density map to a 2D NumPy array for visualization
    density_map = output.squeeze(0).squeeze(0).cpu().numpy()

    # Visualization: Original image and predicted density map side by side
    plt.figure(figsize=(16, 8))

    plt.subplot(1, 2, 1)
    plt.imshow(img)

    plt.subplot(1, 2, 2)
    plt.imshow(density_map, cmap='jet')
    plt.title(f'Predicted Density Map (Count: {predicted_count})')
    plt.axis('off')

    plt.show()

    return predicted_count

In [None]:
# Example usage (update with your image path and best model path)
test_image_path = '/content/drive/MyDrive/crowd_density/test_image1.jpg'
best_model_path = '/content/drive/MyDrive/crowd_density/best_model_trial_1.pth' # Corrected path

predicted_count = predict_image_with_density_map(test_image_path, best_model_path)
print(f'Predicted Crowd Count: {predicted_count}')

In [None]:
# Example usage (update with your image path and best model path)
test_image_path = '/content/drive/MyDrive/crowd_density/test_image2.jpg'
best_model_path = '/content/drive/MyDrive/crowd_density/best_model_trial_1.pth'

predicted_count = predict_image_with_density_map(test_image_path, best_model_path)
print(f'Predicted Crowd Count: {predicted_count}')

In [None]:
# Example usage
test_image_path = '/content/drive/MyDrive/crowd_density/test_image3.jpg'
best_model_path = '/content/drive/MyDrive/crowd_density/best_model_trial_1.pth'

predicted_count = predict_image_with_density_map(test_image_path, best_model_path)
print(f'Predicted Crowd Count: {predicted_count}')