## Crowd Counter Model
Import needed libraries

In [1]:
import math
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import AdamW
from torch.utils.data import DataLoader
import torchvision
import torchvision.models as models
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torchvision.transforms import Compose, Resize, Lambda, ToTensor
import warnings
# Suppress the specific UserWarning
warnings.filterwarnings("ignore", message="The default value of the antialias parameter.*", category=UserWarning)

device = torch.device("cuda:10") if torch.cuda.is_available() else torch.device("cpu")

In [13]:
# Define the transformation for the images (you can customize this based on your needs)
test_size = (512,512)
out_size = (64,64)

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize(test_size),
])

# Define a custom dataset class
class CrowdDataset(torch.utils.data.Dataset):
    def __init__(self, root='jhu_crowd_v2.0/', split='train', transform=None):
        self.input_folder = os.path.join(root, split, 'images')
        self.output_folder = os.path.join(root, split, 'den')
        self.input_dataset = datasets.ImageFolder(self.input_folder, transform=transform)
        self.classes = self.input_dataset.classes
        self.indices = list(range(len(self.input_dataset)))

    def __getitem__(self, index):
        # Load input image
        input_data = self.input_dataset[self.indices[index]][0]  # [0] to get the data (image)

        # Load output data from CSV file
        image_name = os.path.basename(self.input_dataset.imgs[self.indices[index]][0])
        csv_path = os.path.join(self.output_folder, f"{image_name.replace('.jpg', '.csv')}")
        df = pd.read_csv(csv_path, header=None)
        original_data = torch.tensor(df.values).float()
        resized_tensor = F.interpolate(original_data.unsqueeze(0).unsqueeze(0), size=out_size, mode='bilinear', align_corners=False)
        resized_tensor = resized_tensor.squeeze(0).squeeze(0)
        output_data = resized_tensor * (original_data.sum() / resized_tensor.sum())
        return {'input': input_data, 'output': output_data}


    def __len__(self):
        return len(self.indices)


In [15]:
# make all the datasets
train_dataset = CrowdDataset(split='train', transform=transform)
val_dataset = CrowdDataset(split='val', transform=None)
test_dataset = CrowdDataset(split='test', transform=None)

In [16]:
#make a dataloader
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=8, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False)

In [53]:
#loop throough first 10 batches
for i, batch in enumerate(train_dataloader):
    if i == 10:
        break
    print(batch['input'].shape, batch['output'].shape)

torch.Size([8, 3, 512, 512]) torch.Size([8, 64, 64])
torch.Size([8, 3, 512, 512]) torch.Size([8, 64, 64])
torch.Size([8, 3, 512, 512]) torch.Size([8, 64, 64])
torch.Size([8, 3, 512, 512]) torch.Size([8, 64, 64])
torch.Size([8, 3, 512, 512]) torch.Size([8, 64, 64])
torch.Size([8, 3, 512, 512]) torch.Size([8, 64, 64])
torch.Size([8, 3, 512, 512]) torch.Size([8, 64, 64])
torch.Size([8, 3, 512, 512]) torch.Size([8, 64, 64])
torch.Size([8, 3, 512, 512]) torch.Size([8, 64, 64])
torch.Size([8, 3, 512, 512]) torch.Size([8, 64, 64])


In [17]:
class UrebBlock(nn.Module):
    def __init__(self, in_channels):
        super(UrebBlock, self).__init__()
        self.CB = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=1),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, padding=1),  # Add padding to keep the spatial dimensions
            nn.ReLU(),
            nn.Conv2d(32, 1, kernel_size=3, padding=1),   # Add padding to keep the spatial dimensions
        )
        self.DR = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=1),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, padding=1),  # Add padding to keep the spatial dimensions
        )
        self.CEB = nn.Sequential(
            nn.Conv2d(33, 32, kernel_size=1),
            nn.ReLU(),
            nn.Conv2d(32, 16, kernel_size=3, padding=1),  # Add padding to keep the spatial dimensions
            nn.ReLU(),
            nn.Conv2d(16, 16, kernel_size=3, padding=1),   # Add padding to keep the spatial dimensions
            nn.ReLU(),
            nn.Conv2d(16, 1, kernel_size=3, padding=1),   # Add padding to keep the spatial dimensions
        )
    def forward(self, x):
        r = self.CB(x)
        cm = self.CEB(torch.cat((r, self.DR(x)), dim=1))
        return cm * r
        

In [20]:
class VGGadjusted(nn.Module):
    def __init__(self):
        super(VGGadjusted, self).__init__()
        self.vgg = models.vgg16(pretrained=True).features
        self.C3 = self.vgg[:17]
        self.C4 = self.vgg[17:24]
        self.C5 = self.vgg[24:]
        self.C6 = nn.Sequential(
            nn.Conv2d(512, 32, kernel_size=1),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, padding=1),  # Add padding to keep the spatial dimensions
            nn.ReLU(),
            nn.Conv2d(32, 1, kernel_size=3, padding=1),   # Add padding to keep the spatial dimensions
            nn.Upsample(size=(16, 16), mode='bilinear', align_corners=False)
        )
        self.ureb3 = UrebBlock(256)
        self.ureb4 = UrebBlock(512)
        self.ureb5 = UrebBlock(512)

        self.y5_upsample = nn.Upsample(size=(32, 32), mode='bilinear', align_corners=False)
        self.y4_upsample = nn.Upsample(size=(64, 64), mode='bilinear', align_corners=False)

    def forward(self, x):
        c3 = self.C3(x)
        r3 = self.ureb3(c3)
        c4 = self.C4(c3)
        r4 = self.ureb4(c4)
        c5 = self.C5(c4)
        r5 = self.ureb5(c5)
        y6 = self.C6(c5)
        y5 = y6 + r5
        y4 = self.y5_upsample(y5) + r4
        y3 = self.y4_upsample(y4) + r3
        out = {'y3': y3, 'y4': y4, 'y5': y5, 'y6': y6, 'cm3': r3, 'cm4': r4, 'cm5': r5, }
        return out



In [53]:
class LossFunction(nn.Module):
    def __init__(self):
        super(LossFunction, self).__init__()
        self.lambda_c = 0.1
        self.cm_keys = ['cm3', 'cm4', 'cm5']
        self.y_keys = ['y3', 'y4', 'y5']

    def _scale_regularize_tensor(self, tensor, out_size):
        # print(tensor.shape)
        # print(tuple(list(out_size)[1:]))
        out_size = tuple(list(out_size)[1:])
        # print(tensor.unsqueeze(0).shape)
        resized = F.interpolate(tensor.unsqueeze(0), size=out_size, mode='bilinear', align_corners=False)
        resized = resized.squeeze(0).squeeze(0)
        scaled_tensor = resized * (tensor.sum() / resized.sum())
        return scaled_tensor

    def forward(self, y_hat, y):
        loss_C = sum(torch.log(y_hat[cm_key]).sum() for cm_key in self.cm_keys)
        loss_d = sum(F.mse_loss(y_hat[cm_key] * y_hat[y_key], y_hat[cm_key] * self._scale_regularize_tensor(y, y_hat[cm_key].squeeze(1).shape)) for y_key, cm_key in list(zip(self.y_keys, self.cm_keys)))
        return loss_C + self.lambda_c * loss_d


In [54]:
model = VGGadjusted().to(device)
optimizer = AdamW(model.parameters(), lr=1e-4)
criterion = LossFunction()
epochs = 1
loop = tqdm(total=len(train_dataloader)*epochs, position=0, leave=False)
val_loss = []
train_loss = []

                                                                       

In [55]:
def training():
    model.train()
    checkpoint = {
        'epoch': 0,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': 0.0,
    }
    for epoch in range(epochs):
        for i, batch in enumerate(train_dataloader):
            optimizer.zero_grad()
            input_data = batch['input'].to(device)
            output_data = batch['output'].to(device)
            output_pred = model(input_data)
            loss = criterion(output_pred, output_data)
            loss.backward()
            if i % 100 == 0:
                train_loss.append(loss.item())
                # val_loss.append()
            optimizer.step()
            loop.set_description(f"Epoch [{epoch}/{epochs}]")
            loop.set_postfix(loss=loss.item())
            loop.update(1)
        checkpoint['epoch'] = epoch
        checkpoint['model_state_dict'] = model.state_dict()
        checkpoint['optimizer_state_dict'] = optimizer.state_dict()
        checkpoint['loss'] = loss.item()
        torch.save(checkpoint, f'checkpoint_{epoch}.pth')
    loop.close()
training()

  loss_d = sum(F.mse_loss(y_hat[cm_key] * y_hat[y_key], y_hat[cm_key] * self._scale_regularize_tensor(y, y_hat[cm_key].squeeze(1).shape)) for y_key, cm_key in list(zip(self.y_keys, self.cm_keys)))
  loss_d = sum(F.mse_loss(y_hat[cm_key] * y_hat[y_key], y_hat[cm_key] * self._scale_regularize_tensor(y, y_hat[cm_key].squeeze(1).shape)) for y_key, cm_key in list(zip(self.y_keys, self.cm_keys)))
  loss_d = sum(F.mse_loss(y_hat[cm_key] * y_hat[y_key], y_hat[cm_key] * self._scale_regularize_tensor(y, y_hat[cm_key].squeeze(1).shape)) for y_key, cm_key in list(zip(self.y_keys, self.cm_keys)))
  loss_d = sum(F.mse_loss(y_hat[cm_key] * y_hat[y_key], y_hat[cm_key] * self._scale_regularize_tensor(y, y_hat[cm_key].squeeze(1).shape)) for y_key, cm_key in list(zip(self.y_keys, self.cm_keys)))
  loss_d = sum(F.mse_loss(y_hat[cm_key] * y_hat[y_key], y_hat[cm_key] * self._scale_regularize_tensor(y, y_hat[cm_key].squeeze(1).shape)) for y_key, cm_key in list(zip(self.y_keys, self.cm_keys)))
  loss_d = sum(

KeyboardInterrupt: 

In [None]:
#plot val loss
plt.plot(val_loss)
#plot train loss
plt.plot(train_loss)
plt.show()