In [1]:
import kagglehub

# Download Dataset
path = kagglehub.dataset_download("tthien/shanghaitech")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/tthien/shanghaitech?dataset_version_number=1...


100%|██████████| 333M/333M [00:17<00:00, 19.5MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/tthien/shanghaitech/versions/1


In [2]:
import os
from scipy.io import loadmat
import numpy as np
import cv2
from torchvision import transforms
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import scipy.ndimage

class ShanghaiTechDataset(Dataset):
    def __init__(self, root_dir, part, transform=None, gt_downsample=8, sigma=5):
        self.root_dir = os.path.join(root_dir, f'part_{part}', 'train_data')
        self.image_dir = os.path.join(self.root_dir, 'images')
        self.density_dir = os.path.join(self.root_dir, 'ground-truth')
        self.transform = transform
        self.gt_downsample = gt_downsample
        self.sigma = sigma
        self.image_files = [f for f in os.listdir(self.image_dir) if f.endswith('.jpg')]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        # Loading the image
        img_path = os.path.join(self.image_dir, self.image_files[idx])
        image = Image.open(img_path).convert('RGB')

        # Loading .mat file and extracting coordinates
        mat_path = os.path.join(self.density_dir, f'GT_{self.image_files[idx].replace(".jpg", ".mat")}')
        mat = loadmat(mat_path)
        points = mat['image_info'][0][0][0][0][0]

        # Generating density map with original image size
        density_map = self.generate_density_map(image.size, points)

        # Downsampling density map to match model output size (1/8 of image size)
        density_map = cv2.resize(density_map, (image.size[0] // self.gt_downsample, image.size[1] // self.gt_downsample))
        density_map = density_map[np.newaxis, :, :] * (self.gt_downsample ** 2)  # Scale to keep total count the same

        # Applying image transformations
        if self.transform:
            image = self.transform(image)

        # Converting density map to torch tensor
        density_map = torch.from_numpy(density_map).float()

        return image, density_map

    def generate_density_map(self, image_shape, points):
        """
        Generates a density map for an image based on provided points.
        Each point is represented as a Gaussian in the density map.
        """
        density_map = np.zeros((image_shape[1], image_shape[0]), dtype=np.float32)

        for point in points:
            x, y = min(int(point[0]), image_shape[0] - 1), min(int(point[1]), image_shape[1] - 1)
            density_map[y, x] += 1

        # Applying Gaussian blur
        density_map = cv2.GaussianBlur(density_map, (self.sigma, self.sigma), self.sigma)
        return density_map


# Data augmentation and transformation
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Loading train dataset
dataset_A = ShanghaiTechDataset(
    root_dir='/root/.cache/kagglehub/datasets/tthien/shanghaitech/versions/1/ShanghaiTech',
    part='A',
    transform=transform
)
dataset_B = ShanghaiTechDataset(
    root_dir='/root/.cache/kagglehub/datasets/tthien/shanghaitech/versions/1/ShanghaiTech',
    part='B',
    transform=transform
)
train_dataloader_A = DataLoader(dataset_A, batch_size=1, shuffle=True)
train_dataloader_B = DataLoader(dataset_B, batch_size=1, shuffle=True)


In [3]:
import torch.nn as nn
import torchvision.models as models

# Model Architecture
class CSRNet(nn.Module):
    def __init__(self):
        super(CSRNet, self).__init__()

        # Frontend: VGG-16 layers (up to conv4_3 layer)
        self.frontend = models.vgg16(pretrained=True).features[:23]

        # Backend: Dilated convolutional layers
        self.backend = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, padding=2, dilation=2),
            nn.ReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=2, dilation=2),
            nn.ReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=2, dilation=2),
            nn.ReLU(),
            nn.Conv2d(512, 256, kernel_size=3, padding=2, dilation=2),
            nn.ReLU(),
            nn.Conv2d(256, 128, kernel_size=3, padding=2, dilation=2),
            nn.ReLU(),
            nn.Conv2d(128, 64, kernel_size=3, padding=2, dilation=2),
            nn.ReLU(),
        )

        # Output layer for density map generation
        self.output_layer = nn.Conv2d(64, 1, kernel_size=1)

    def forward(self, x):
        x = self.frontend(x)
        x = self.backend(x)
        x = self.output_layer(x)
        return x

In [4]:
model = CSRNet().cuda()

Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to /root/.cache/torch/hub/checkpoints/vgg16-397923af.pth
100%|██████████| 528M/528M [00:06<00:00, 81.9MB/s]


In [5]:
model

CSRNet(
  (frontend): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilatio

In [None]:
import torch.optim as optim
import torch


# Defining Model, Loss Function, and Optimizer
model = CSRNet().cuda() if torch.cuda.is_available() else CSRNet()
criterion = torch.nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-5)

train_loss_history = []

# Training loop
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for i, (images, density_maps) in enumerate(train_dataloader_B):
        images = images.cuda() if torch.cuda.is_available() else images
        density_maps = density_maps.cuda() if torch.cuda.is_available() else density_maps

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, density_maps)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        if (i + 1) % 10 == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(train_dataloader_B)}], Loss: {loss.item():.4f}')

    epoch_loss = running_loss / len(train_dataloader_B)
    train_loss_history.append(epoch_loss)

    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}')

In [None]:
import matplotlib.pyplot as plt

# Plotting Training Loss over Epochs
plt.figure(figsize=(10, 5))
plt.plot(range(1, num_epochs + 1), train_loss_history, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss Over Epochs')
plt.legend()
plt.show()

In [2]:
import torch
import numpy as np

def mae(pred, gt):
    return np.abs(pred - gt).sum()

def mse(pred, gt):
    return ((pred - gt) ** 2).sum()

In [3]:
class ShanghaiTechTestDataset(Dataset):
    def __init__(self, root_dir, part, transform=None, gt_downsample=8, sigma=5):
        self.root_dir = os.path.join(root_dir, f'part_{part}', 'test_data')
        self.image_dir = os.path.join(self.root_dir, 'images')
        self.density_dir = os.path.join(self.root_dir, 'ground-truth')
        self.transform = transform
        self.gt_downsample = gt_downsample
        self.sigma = sigma
        self.image_files = [f for f in os.listdir(self.image_dir) if f.endswith('.jpg')]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        # Loading the image
        img_path = os.path.join(self.image_dir, self.image_files[idx])
        image = Image.open(img_path).convert('RGB')

        # Loading .mat file and extract coordinates
        mat_path = os.path.join(self.density_dir, f'GT_{self.image_files[idx].replace(".jpg", ".mat")}')
        mat = loadmat(mat_path)
        points = mat['image_info'][0][0][0][0][0]  # Coordinates of people

        # Generating density map with original image size
        density_map = self.generate_density_map(image.size, points)

        # Downsampling density map to match model output size (1/8 of image size)
        density_map = cv2.resize(density_map, (image.size[0] // self.gt_downsample, image.size[1] // self.gt_downsample))
        density_map = density_map[np.newaxis, :, :] * (self.gt_downsample ** 2)  # Scale to keep total count the same

        # Applying image transformations if provided
        if self.transform:
            image = self.transform(image)

        # Converting density map to torch tensor
        density_map = torch.from_numpy(density_map).float()

        return image, density_map

    def generate_density_map(self, image_shape, points):
        """
        Generates a density map for an image based on provided points.
        Each point is represented as a Gaussian in the density map.
        """
        density_map = np.zeros((image_shape[1], image_shape[0]), dtype=np.float32)

        for point in points:
            x, y = min(int(point[0]), image_shape[0] - 1), min(int(point[1]), image_shape[1] - 1)
            density_map[y, x] += 1

        # Applying Gaussian blur
        density_map = cv2.GaussianBlur(density_map, (self.sigma, self.sigma), self.sigma)
        return density_map


# Data augmentation and transformation
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Loading test dataset
test_dataset_A = ShanghaiTechTestDataset(
    root_dir='/root/.cache/kagglehub/datasets/tthien/shanghaitech/versions/1/ShanghaiTech',
    part='A',
    transform=transform
)
test_dataset_B = ShanghaiTechTestDataset(
    root_dir='/root/.cache/kagglehub/datasets/tthien/shanghaitech/versions/1/ShanghaiTech',
    part='B',
    transform=transform
)
test_dataloader_A = DataLoader(test_dataset_A, batch_size=1, shuffle=False)
test_dataloader_B = DataLoader(test_dataset_B, batch_size=1, shuffle=False)

NameError: name 'Dataset' is not defined

In [None]:
model.eval()

total_mae, total_mse = 0, 0

with torch.no_grad():
    for images, density_maps in test_dataloader_B:
        images = images.cuda() if torch.cuda.is_available() else images
        density_maps = density_maps.cuda() if torch.cuda.is_available() else density_maps

        # Prediction
        outputs = model(images)

        # Computing predicted count by summing the density map values
        pred_count = outputs.squeeze().cpu().numpy().sum()

        # Computing ground-truth count by summing the density map values
        gt_count = density_maps.squeeze().cpu().numpy().sum()

        # Calculating MAE and MSE for this sample
        total_mae += mae(pred_count, gt_count)
        total_mse += mse(pred_count, gt_count)

# Calculating the average MAE and MSE over the entire test set
avg_mae = total_mae / len(test_dataloader_B)
avg_mse = np.sqrt(total_mse / len(test_dataloader_B))

print(f"Mean Absolute Error (MAE): {avg_mae:.2f}")
print(f"Mean Squared Error (MSE): {avg_mse:.2f}")


In [None]:
# Save model weights
torch.save(model.state_dict(), 'csrnet_model.pth')

# Save the entire model
torch.save(model, 'csrnet_full_model.pth')
