In [None]:
import os
data_path = "/kaggle/input/shanghaitech/"
print(os.listdir(data_path))


In [None]:
# Importing required libraries.

#GENERAL
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random

#PATH PROCESS
import os
from pathlib import Path
import glob
from scipy.io import loadmat

# image processing
import cv2
from scipy.ndimage import gaussian_filter

# Neural Network
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision

In [None]:
import os
from pathlib import Path
import cv2
import numpy as np
import matplotlib.pyplot as plt

# Correct Windows path using raw string
path_img_sample = r"/kaggle/input/shanghaitech/ShanghaiTech/part_B/train_data/images/IMG_115.jpg"

p = Path(path_img_sample)
if not p.exists():
    raise FileNotFoundError(f"Image not found: {p.resolve()}")

# Normal read
img = cv2.imread(str(p), cv2.IMREAD_COLOR)
# Fallback for Windows unicode paths
if img is None:
    data = np.fromfile(str(p), dtype=np.uint8)
    if data.size:
        img = cv2.imdecode(data, cv2.IMREAD_COLOR)

if img is None:
    raise ValueError(f"Failed to read image: {p}")

img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

# Display
plt.imshow(img_rgb)
plt.title("Sample Crowd Image")
plt.axis('off')
plt.show()


In [None]:
# Ground-truth corrosponding to the sample image.
# Prefer the ground-truth folder next to the images folder
target_filename = "GT_IMG_37.mat"
gt_candidate = p.parent.parent / "ground-truth" / target_filename

if gt_candidate.exists():
	gt_path = str(gt_candidate)
else:
	# Try GT file that matches the sample image (e.g. IMG_37 -> GT_IMG_37.mat)
	gt_for_img_name = f"GT_{p.stem}.mat"
	gt_candidate2 = p.parent.parent / "ground-truth" / gt_for_img_name
	if gt_candidate2.exists():
		gt_path = str(gt_candidate2)
		print(f"Requested GT not found. Using GT for sample image: {gt_path}")
	else:
		gt_dir = p.parent.parent / "ground-truth"
		if gt_dir.exists():
			available = sorted(gt_dir.glob("GT_*.mat"))
			if available:
				gt_path = str(available[0])
				print(f"Requested GT not found. Using first available GT file: {gt_path}")
			else:
				raise FileNotFoundError(f"No GT .mat files found in {gt_dir}")
		else:
			raise FileNotFoundError(
				f"Ground-truth directory not found: {gt_dir}\n"
				f"Tried: {gt_candidate} and {gt_candidate2}"
			)

gt_sample = loadmat(gt_path)
print('Loaded:', gt_path)
print('type:', type(gt_sample))
print(list(gt_sample.keys()))
# show a short preview instead of dumping all items
items_preview = list(gt_sample.items())[:5]
print("Items preview:", items_preview)

In [None]:
print(gt_sample.keys())

In [None]:
# Extracting the coordinates from the ground-truth sample.

gt_coor_sample = gt_sample.get('image_info')[0][0][0][0][0]
print('Shape of coordinates: ', gt_coor_sample.shape)

In [None]:
# Marking the coordinates after extracting them from the ground-truth on the original image sample.
figure = plt.figure(figsize=(5,5))

# create a copy of the RGB image loaded previously and draw markers on that copy
image_sample = img_rgb.copy()

for x_cor, y_cor in gt_coor_sample:
    cv2.drawMarker(image_sample, (int(x_cor), int(y_cor)), (255, 0, 0), markerType=cv2.MARKER_CROSS, thickness=3, markerSize=10)

plt.imshow(image_sample)
plt.title("Image and Coordinate")
plt.axis('off')
plt.show()

In [None]:
def gen_density_map_gaussian(image, coords, sigma=5):
    img_zeros = np.zeros((image.shape[:2]), dtype=np.float32)
    for x_cor, y_cor in coords:
        img_zeros[int(y_cor), int(x_cor)] = 1

    density_map = gaussian_filter(img_zeros,sigma=sigma,truncate=5*5)

    return density_map

In [None]:
density_map_sample = gen_density_map_gaussian(image_sample, gt_coor_sample, 5)

# Creating a new Matplotlib figure.
figure = plt.figure(figsize=(10,5))

plt.subplot(1,2,1)

# Converting image into torch tensor.
image_sample = torch.tensor(image_sample/255, dtype=torch.float)
plt.xlabel(image_sample.shape)
plt.title('GT: '+str(gt_coor_sample.shape[0]))

# Showing the original image with the ground-truth marks.
plt.imshow(image_sample)

plt.subplot(1,2,2)
plt.xlabel(density_map_sample.shape)
plt.title('DM: '+str(np.sum(density_map_sample)))

# Showing the corrosponding generated density map.
plt.imshow(density_map_sample, cmap="jet")

In [None]:

class CrowdDataset(Dataset):
    def __init__(self, root_dir, gt_downsample=4, shuffle=False):
        self.root_dir = root_dir
        self.gt_downsample = gt_downsample
        self.shuffle = shuffle


        self.img_names = [filename for filename in os.listdir(os.path.join(root_dir, 'images')) if filename.endswith('.jpg')]

        if self.shuffle:
            random.shuffle(self.img_names)

        self.n_people = {}
        self.DMs = {}
        for image_filename in self.img_names:
            img_path = os.path.join(root_dir, 'images', image_filename)
            GT_filename = 'GT_' + image_filename.split('.')[0] + '.mat'
            path_GT = os.path.join(root_dir, 'ground-truth', GT_filename)
            GT = loadmat(path_GT).get('image_info')[0][0][0][0][0]
            img = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
            self.DMs[img_path] = gen_density_map_gaussian(img, GT, 5)
            self.n_people[img_path] = GT.shape[0]


    def __len__(self):
        return len(self.img_names)

    def __getitem__(self, index):
        img_path = os.path.join(self.root_dir, 'images', self.img_names[index])  # Include the directory path
        img = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
        gt_density_map = self.DMs[img_path]
        gt_n_people = self.n_people[img_path]

        if len(img.shape) == 2: 
            img = img[:, :, np.newaxis]
            img = np.concatenate((img, img, img), 2)

        # downsample
        ds_rows = int(img.shape[0] // self.gt_downsample)
        ds_cols = int(img.shape[1] // self.gt_downsample)
        img = cv2.resize(img, (ds_cols*self.gt_downsample, ds_rows*self.gt_downsample))
        gt_density_map = cv2.resize(gt_density_map, (ds_cols, ds_rows))
        gt_density_map = gt_density_map[np.newaxis, :, :] * self.gt_downsample * self.gt_downsample

        img = img.transpose((2,0,1)) # convert to order (channel, rows, cols)
        img_tensor = torch.tensor(img/255, dtype=torch.float)
        dm_tensor = torch.tensor(gt_density_map, dtype=torch.float)

        return img_tensor, dm_tensor, gt_n_people

In [None]:
import torch
from torch.utils.data import DataLoader, random_split

root_dir = "../input/shanghaitech/ShanghaiTech/part_B/test_data/"
dataset = CrowdDataset(root_dir, gt_downsample=4, shuffle=True)

In [None]:
# Print some samples of dataset as a sanity check
for i, (img, gt_dmap, n_people) in enumerate(dataset):
  plt.figure(figsize=(10, 5))
  plt.subplot(1,2,1)
  plt.xlabel(img.shape)
  plt.title('GT: ' + str(n_people))
  plt.imshow(img.permute(1, 2, 0))

  plt.subplot(1,2,2)
  plt.xlabel(gt_dmap.shape)
  plt.title('DM: ' + str(np.sum(gt_dmap.numpy())))
  plt.imshow(gt_dmap.permute(1, 2, 0), cmap="jet")
  plt.show()

  if i > 0:
    print('type of img: ', type(img))
    print('type of dmap: ', type(gt_dmap))
    print('shape of img: ', img.shape)
    break

In [None]:
class MC_CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.column1 = nn.Sequential(
            nn.Conv2d(3, 8, 9, padding='same'),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(8, 16, 7, padding='same'),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, 7, padding='same'),
            nn.ReLU(),
            nn.Conv2d(32, 16, 7, padding='same'),
            nn.ReLU(),
            nn.Conv2d(16, 8, 7, padding='same'),
            nn.ReLU(),
        )

        self.column2 = nn.Sequential(
            nn.Conv2d(3, 10, 7,padding='same'),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(10, 20, 5,padding='same'),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(20, 40, 5,padding='same'),
            nn.ReLU(),
            nn.Conv2d(40, 20, 5,padding='same'),
            nn.ReLU(),
            nn.Conv2d(20, 10, 5,padding='same'),
            nn.ReLU(),
        )

        self.column3 = nn.Sequential(
            nn.Conv2d(3, 12, 5, padding='same'),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(12, 24, 3, padding='same'),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(24, 48, 3, padding='same'),
            nn.ReLU(),
            nn.Conv2d(48, 24, 3, padding='same'),
            nn.ReLU(),
            nn.Conv2d(24, 12, 3, padding='same'),
            nn.ReLU(),
        )
        

        self.fusion_layer = nn.Sequential(
            nn.Conv2d(30, 1, 1, padding=0),
            #nn.ReLU()
        )


    def forward(self,img_tensor):
        x1 = self.column1(img_tensor)
        x2 = self.column2(img_tensor)
        x3 = self.column3(img_tensor)
        x = torch.cat((x1, x2, x3),1)
        x = self.fusion_layer(x)
        return x

In [None]:
!pip install torchsummary

In [None]:
# 1 batch size, 3 color channels (RGB), 768x1024 image dimensions
img = torch.rand((1, 3, 768, 1024), dtype=torch.float)

# Initialize the MC-CNN model
mcnn = MC_CNN()

# Generate the density map using the model
out_dmap = mcnn(img)

# Print the shape of the output density map
print(out_dmap.shape)


In [None]:
from torchsummary import summary

# Select device (GPU if available, else CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Move model and sample input to the selected device
mcnn = mcnn.to(device)
img = img.to(device)

# Display model summary
summary(mcnn, input_size=(3, 768, 1024))


In [None]:
from torch.utils.data import DataLoader, Subset, random_split

batch_size = 8
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

train_root_dir = "../input/shanghaitech/ShanghaiTech/part_B/train_data/"
test_root_dir = "../input/shanghaitech/ShanghaiTech/part_B/test_data/"

# Initialize dataset (no shuffle here)
full_train_dataset = CrowdDataset(train_root_dir, gt_downsample=4)

# Split 90% for training and 10% for validation
train_size = int(0.9 * len(full_train_dataset))
val_size = len(full_train_dataset) - train_size
train_dataset, val_dataset = random_split(full_train_dataset, [train_size, val_size])

# Create DataLoaders (shuffle only for training)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Test dataset and loader
test_dataset = CrowdDataset(test_root_dir, gt_downsample=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Print stats
print("Number of images in train_dataset:", len(train_dataset))
print("Number of images in val_dataset:", len(val_dataset))
print("Number of images in test_dataset:", len(test_dataset))
print("Number of batches in train_loader:", len(train_loader))
print("Number of batches in val_loader:", len(val_loader))
print("Number of batches in test_loader:", len(test_loader))


In [None]:
# Visualize pairs of images and their corresponding density maps side by side in a grid layout
def plot_corresponding_pairs(batch1, batch2, plot_map='jet'):
    num_images = batch1.shape[0]
    num_cols = 4
    num_rows = int(np.ceil(num_images / num_cols)) * 2  # two rows per image (image + map)

    fig, axes = plt.subplots(num_rows, num_cols, figsize=(16, num_rows * 2))
    axes = axes.flatten()  # flatten for easier indexing

    for i in range(num_images):
        # Plot image
        axes[i * 2].imshow(batch1[i].permute(1, 2, 0).cpu())
        axes[i * 2].axis('off')

        # Plot corresponding density map
        dmap = batch2[i].squeeze().detach().cpu().numpy()
        axes[i * 2 + 1].imshow(dmap, cmap=plot_map)
        axes[i * 2 + 1].set_title(f"DM: {np.sum(dmap):.2f}")
        axes[i * 2 + 1].axis('off')

    # Hide unused subplots
    for j in range(i * 2 + 2, len(axes)):
        axes[j].axis('off')

    plt.tight_layout()
    plt.show()


In [None]:
# Get one batch of samples from the training loader
data_iter = iter(train_loader)
sample_images, sample_dmaps, sample_n_people = next(data_iter)

# Visualize images and their corresponding density maps
plot_corresponding_pairs(sample_images, sample_dmaps)

# Print ground truth number of people for each sample
print("Ground truth counts:")
print(" ".join(f"{sample_n_people[j].item():5.1f}" for j in range(batch_size)))


In [None]:
class CombinedLoss(nn.Module):
    def __init__(self, weight_dmap=0.8, weight_sum_gt=0.2):
        super().__init__()
        self.weight_dmap = weight_dmap
        self.weight_sum_gt = weight_sum_gt

        self.img_loss = nn.MSELoss()
        self.gt_loss_mae = nn.L1Loss()
        self.gt_loss_mse = nn.MSELoss()  

    def forward(self, logits, batch_dmap, batch_gts):
        batch_gts = batch_gts.float()

        # Density map loss (pixel-level MSE)
        img_loss = self.img_loss(logits, batch_dmap)

        # Count-level losses (sum over density map)
        pred_counts = torch.squeeze(logits.sum(dim=(2, 3)))
        gt_loss_mae = self.gt_loss_mae(pred_counts, batch_gts)
        gt_loss_mse = self.gt_loss_mse(pred_counts, batch_gts)

        # Weighted combined loss
        combined_loss = (
            self.weight_dmap * img_loss +
            self.weight_sum_gt * gt_loss_mae
        )

        return combined_loss, gt_loss_mae


In [None]:
num_epochs = 50

# Track losses
train_losses, val_losses = [], []
train_mae_losses, val_mae_losses = [], []

# Initialize model, criterion, optimizer
model = MC_CNN().to(device)
criterion = CombinedLoss(weight_dmap=0.8, weight_sum_gt=0.2)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

best_val_loss = float('inf')
best_epoch = 0

for epoch in range(num_epochs):
    print(f"\nEpoch {epoch + 1}/{num_epochs}")


    # Training Phase
    model.train()
    tr_loss_acc, tr_mae_acc = 0.0, 0.0

    for batch_img, batch_dmap, batch_gts in train_loader:
        batch_img, batch_dmap, batch_gts = (
            batch_img.to(device),
            batch_dmap.to(device),
            batch_gts.to(device)
        )

        # Forward pass
        logits = model(batch_img)
        loss, mae_loss = criterion(logits, batch_dmap, batch_gts)

        # Backward pass and optimizer step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Accumulate losses
        tr_loss_acc += loss.item()
        tr_mae_acc += mae_loss.item()

    # Compute average over the dataset
    tr_loss = tr_loss_acc / len(train_loader.dataset)
    tr_mae = tr_mae_acc / len(train_loader.dataset)
    print(f">> TRAIN | Loss: {tr_loss:.6f} | MAE: {tr_mae:.6f}")

  
    # Validation Phase
    model.eval()
    val_loss_acc, val_mae_acc = 0.0, 0.0

    with torch.inference_mode():
        for batch_img_val, batch_dmap_val, batch_gts_val in val_loader:
            batch_img_val, batch_dmap_val, batch_gts_val = (
                batch_img_val.to(device),
                batch_dmap_val.to(device),
                batch_gts_val.to(device)
            )

            logits = model(batch_img_val)
            loss, mae_loss = criterion(logits, batch_dmap_val, batch_gts_val)

            val_loss_acc += loss.item()
            val_mae_acc += mae_loss.item()

    val_loss = val_loss_acc / len(val_loader.dataset)
    val_mae = val_mae_acc / len(val_loader.dataset)
    print(f">> VAL   | Loss: {val_loss:.6f} | MAE: {val_mae:.6f}")

    # Save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_epoch = epoch
        torch.save(model.state_dict(), './crowd_counting_best.pth')

    # Track losses
    train_losses.append(tr_loss)
    train_mae_losses.append(tr_mae)
    val_losses.append(val_loss)
    val_mae_losses.append(val_mae)

print(f"\nBest Epoch: {best_epoch + 1}")
print(f"Best TRAIN MAE: {train_mae_losses[best_epoch]:.6f}")
print(f"Best VAL   MAE: {val_mae_losses[best_epoch]:.6f}")


In [None]:
plt.figure(figsize=(12, 5))

# Weighted Loss Plot
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Training Weighted Loss', marker='o')
plt.plot(val_losses, label='Validation Weighted Loss', marker='o')
plt.title('Training vs Validation LOSS')
plt.xlabel('Epochs')
plt.ylabel('Weighted Loss')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.6)

# MAE Plot
plt.subplot(1, 2, 2)
plt.plot(train_mae_losses, label='Training MAE', marker='o')
plt.plot(val_mae_losses, label='Validation MAE', marker='o')
plt.title('Training vs Validation MAE')
plt.xlabel('Epochs')
plt.ylabel('Mean Absolute Error (MAE)')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.6)

plt.tight_layout()
plt.show()


In [None]:
# Load the best saved model
best_model = MC_CNN().to(device)
best_model.load_state_dict(torch.load('./crowd_counting_best.pth', map_location=device))
best_model.eval()

# Get a random validation batch
data_iter = iter(val_loader)
sample_images, _, sample_gts = next(data_iter)

# Predict density maps
with torch.inference_mode():
    pred_dms = best_model(sample_images.to(device))

# Visualize original images and their predicted density maps
plot_corresponding_pairs(sample_images.cpu(), pred_dms.cpu(), plot_map='twilight')

# Print ground-truth counts for reference
print("Ground Truth Counts:")
print(" ".join(f"{sample_gts[j].item():5.1f}" for j in range(batch_size)))


In [None]:
# Mean Absolute Error (MAE) for crowd counting
criterion = nn.L1Loss()

test_loss_acc = 0.0

# Disable gradient tracking for evaluation
with torch.inference_mode():
    for batch_img, batch_dmap, batch_gts in test_loader:
        batch_img = batch_img.to(device)
        batch_dmap = batch_dmap.to(device)
        batch_gts = batch_gts.to(device)

        # Predict and compute MAE between predicted and true counts
        logits = best_model(batch_img)
        pred_counts = torch.squeeze(logits.sum(dim=(2, 3)))
        loss = criterion(pred_counts, batch_gts)

        test_loss_acc += loss.item()

# Print test results
test_mae = test_loss_acc / len(test_loader.dataset)
print(f"TEST:  MAE = {test_mae:.3f}")


In [None]:
# Evaluate MAE and RMSE on the test set
from math import sqrt

best_model.eval()
mae_total, mse_total = 0.0, 0.0

with torch.inference_mode():
    for batch_img, _, batch_gts in test_loader:
        batch_img, batch_gts = batch_img.to(device), batch_gts.to(device)
        logits = best_model(batch_img)

        pred_counts = torch.squeeze(logits.sum(dim=(2, 3)))
        diff = pred_counts - batch_gts
        mae_total += torch.abs(diff).sum().item()
        mse_total += (diff ** 2).sum().item()

num_samples = len(test_loader.dataset)
mae = mae_total / num_samples
rmse = sqrt(mse_total / num_samples)

print(f"TEST RESULTS:")
print(f"  MAE  = {mae:.3f}")
print(f"  RMSE = {rmse:.3f}")


In [None]:
data_iter = iter(test_loader)
sample_images, _, sample_gts = next(data_iter)
sample_images = sample_images.to(device)

with torch.inference_mode():
    pred_dmaps = best_model(sample_images)
    pred_counts = pred_dmaps.sum(dim=(2, 3)).cpu().numpy().flatten()

plot_corresponding_pairs(sample_images.cpu(), pred_dmaps.cpu(), plot_map='viridis')

print("Ground Truth Counts: ", sample_gts.tolist())
print("Predicted Counts:    ", [round(x, 1) for x in pred_counts])
