In [7]:
import os
import glob
import h5py
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image
import matplotlib.pyplot as plt
import cv2
from scipy.io import loadmat
import torch.nn.functional as F

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", DEVICE)

TRAIN_IMG_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\train_data\images"
TRAIN_GT_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\train_data\ground-truth"
TEST_IMG_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\test_data\images"
TEST_GT_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\test_data\ground-truth"

IMG_HEIGHT = 256
IMG_WIDTH = 256
DOWNSAMPLE_FACTOR = 8
OUTPUT_SIZE = IMG_HEIGHT // DOWNSAMPLE_FACTOR

def gaussian_filter_density(gt):
    density = np.zeros(gt.shape, dtype=np.float32)
    pts = np.array(list(zip(np.nonzero(gt)[1], np.nonzero(gt)[0])))
    if len(pts) == 0:
        return density
    
    sigma = 15
    for i in range(len(pts)):
        pt2d = np.zeros(gt.shape, dtype=np.float32)
        y, x = pts[i][1], pts[i][0]
        if y < gt.shape[0] and x < gt.shape[1]:
             pt2d[y, x] = 1.
        density += cv2.GaussianBlur(pt2d, (0,0), sigma, borderType=cv2.BORDER_CONSTANT)
    return density

class CrowdDataset(Dataset):
    def __init__(self, img_dir, gt_dir, transform=None, img_size=(IMG_HEIGHT, IMG_WIDTH)):
        self.img_paths = glob.glob(os.path.join(img_dir, "*.jpg"))
        self.gt_dir = gt_dir
        self.transform = transform
        self.img_size = img_size

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        img = Image.open(img_path).convert('RGB')

        filename = os.path.basename(img_path).replace(".jpg",".mat")
        mat_path = os.path.join(self.gt_dir, "GT_" + filename) 
        
        if not os.path.exists(mat_path):
            raise FileNotFoundError(f"Ground truth file not found: {mat_path}")

        try:
            mat = h5py.File(mat_path, 'r')
            gt = np.array(mat['image_info'][0][0][0][0][0])
            mat.close()
        except OSError:
            mat = loadmat(mat_path)
            gt = np.array(mat['image_info'][0, 0]['location'])

        gt = gt.flatten()

        if gt.size == 0:
            gt = np.empty((0, 2), dtype=np.int32)
        elif gt.size % 2 != 0:
            gt = np.empty((0, 2), dtype=np.int32)
        else:
            gt = gt.reshape(-1, 2).astype(np.int32)

        h, w = img.size[1], img.size[0]

        k = np.zeros((h, w))
        for i in range(len(gt)):
            x, y = gt[i][0], gt[i][1]
            if y < h and x < w:
                k[y, x] = 1

        density = gaussian_filter_density(k)

        img = img.resize(self.img_size)
        
        scale_factor = (self.img_size[0] * self.img_size[1]) / (h * w)
        density = cv2.resize(density, (self.img_size[1], self.img_size[0]), interpolation=cv2.INTER_LINEAR)
        density = density * scale_factor

        if self.transform:
            img = self.transform(img)

        density = torch.from_numpy(density).unsqueeze(0).float()
        return img, density

class CSRNet(nn.Module):
    def __init__(self):
        super(CSRNet, self).__init__()
        vgg = models.vgg16_bn(weights=models.VGG16_BN_Weights.DEFAULT)
        self.frontend = nn.Sequential(*list(vgg.features.children())[:33]) 
        
        self.backend = nn.Sequential(
            nn.Conv2d(512, 512, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 256, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 128, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 64, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True)
        )
        self.output_layer = nn.Conv2d(64, 1, 1)

    def forward(self, x):
        x = self.frontend(x)
        x = self.backend(x)
        x = self.output_layer(x)
        return x

def train_model():
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])

    train_dataset = CrowdDataset(TRAIN_IMG_DIR, TRAIN_GT_DIR, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)

    model = CSRNet().to(DEVICE)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-5)

    NUM_EPOCHS = 2
    print(f"Starting Training for {NUM_EPOCHS} epochs...")
    for epoch in range(NUM_EPOCHS):
        model.train()
        epoch_loss = 0
        for imgs, densities in train_loader:
            imgs, densities = imgs.to(DEVICE), densities.to(DEVICE)
            
            target_densities_downsampled = F.interpolate(
                densities, 
                size=(OUTPUT_SIZE, OUTPUT_SIZE), 
                mode='bilinear', 
                align_corners=False
            )
            
            target_densities_downsampled = target_densities_downsampled * (DOWNSAMPLE_FACTOR ** 2)
            
            optimizer.zero_grad()
            outputs = model(imgs)
            
            loss = criterion(outputs, target_densities_downsampled) 
            
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        print(f"Epoch {epoch+1}/{NUM_EPOCHS}, Loss: {epoch_loss/len(train_loader):.4f}")

    torch.save(model.state_dict(), "csrnet_partA.pth")
    print("Model saved to csrnet_partA.pth")
    return model

def evaluate_model(model):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])
    
    val_dataset = CrowdDataset(TEST_IMG_DIR, TEST_GT_DIR, transform=transform)
    val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)

    print("Starting Validation...")
    model.eval()
    mae, rmse, n = 0, 0, 0
    with torch.no_grad():
        for imgs, densities in val_loader:
            imgs, densities = imgs.to(DEVICE), densities.to(DEVICE)
            outputs = model(imgs)
            
            predicted_count = outputs.sum().item()
            actual_count = densities.sum().item()
            
            mae += abs(predicted_count - actual_count)
            rmse += (predicted_count - actual_count)**2
            n += 1
    mae /= n
    rmse = np.sqrt(rmse/n)
    print(f"Validation MAE: {mae:.2f}, RMSE: {rmse:.2f}")

def visualize_density(model, img_path):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])
    
    img = Image.open(img_path).convert('RGB')
    img_orig = np.array(img)
    img = img.resize((IMG_HEIGHT, IMG_WIDTH))
    
    img_t = transform(img).unsqueeze(0).to(DEVICE)
    model.eval()
    with torch.no_grad():
        output = model(img_t)
        
    density_map = output.squeeze().cpu().numpy()
    
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.imshow(img_orig)
    plt.title("Original Image")
    
    plt.subplot(1, 2, 2)
    plt.imshow(density_map, cmap='jet')
    plt.title("Predicted Density Map")
    
    plt.show()
    print("Predicted count:", output.sum().item())

def alert_system(model, img_path, crowd_limit=50):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])
    
    img = Image.open(img_path).convert('RGB')
    img_resized = img.resize((IMG_WIDTH, IMG_HEIGHT))
    img_t = transform(img_resized).unsqueeze(0).to(DEVICE)
    
    model.eval()
    with torch.no_grad():
        output = model(img_t)
        
    count = int(output.sum().item())
    
    if count > crowd_limit:
        print(f"🚨 ALERT: Overcrowded! Count: {count} (Limit: {crowd_limit})")
        return True, count
    else:
        print(f"✅ Normal crowd level. Count: {count} (Limit: {crowd_limit})")
        return False, count

def process_images_with_alerts(model, img_dir, crowd_limit=50):
    image_files = glob.glob(os.path.join(img_dir, "*.jpg"))
    alert_count = 0
    total_images = 0
    
    print(f"Processing {len(image_files)} images with crowd limit: {crowd_limit}")
    print("-" * 60)
    
    for img_path in image_files:
        filename = os.path.basename(img_path)
        is_alert, count = alert_system(model, img_path, crowd_limit)
        
        if is_alert:
            alert_count += 1
        
        total_images += 1
        print(f"Image: {filename} | Count: {count} | {'ALERT' if is_alert else 'NORMAL'}")
    
    print("-" * 60)
    print(f"Summary: {alert_count}/{total_images} images triggered alerts")
    print(f"Alert percentage: {(alert_count/total_images)*100:.1f}%")

def load_pretrained_model():
    model = CSRNet().to(DEVICE)
    if os.path.exists("csrnet_partA.pth"):
        model.load_state_dict(torch.load("csrnet_partA.pth", map_location=DEVICE))
        print("Loaded pretrained model")
    else:
        print("No pretrained model found, training new model...")
        model = train_model()
    return model

if __name__ == "__main__":
    model = load_pretrained_model()
    evaluate_model(model)

Device: cpu
Loaded pretrained model
Starting Validation...
Validation MAE: 0.21, RMSE: 0.41


In [12]:
import os
import glob
import h5py
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image
import matplotlib.pyplot as plt
import cv2
from scipy.io import loadmat
import torch.nn.functional as F

torch.manual_seed(42)
np.random.seed(42)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", DEVICE)

TRAIN_IMG_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\train_data\images"
TRAIN_GT_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\train_data\ground-truth"
TEST_IMG_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\test_data\images"
TEST_GT_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\test_data\ground-truth"

IMG_HEIGHT = 256
IMG_WIDTH = 256
DOWNSAMPLE_FACTOR = 8
OUTPUT_SIZE = IMG_HEIGHT // DOWNSAMPLE_FACTOR

def gaussian_filter_density(gt):
    density = np.zeros(gt.shape, dtype=np.float32)
    pts = np.array(list(zip(np.nonzero(gt)[1], np.nonzero(gt)[0])))
    if len(pts) == 0:
        return density
    
    sigma = 15
    for i in range(len(pts)):
        pt2d = np.zeros(gt.shape, dtype=np.float32)
        y, x = pts[i][1], pts[i][0]
        if y < gt.shape[0] and x < gt.shape[1]:
             pt2d[y, x] = 1.
        density += cv2.GaussianBlur(pt2d, (0,0), sigma, borderType=cv2.BORDER_CONSTANT)
    return density

class CrowdDataset(Dataset):
    def __init__(self, img_dir, gt_dir, transform=None, img_size=(IMG_HEIGHT, IMG_WIDTH)):
        self.img_paths = glob.glob(os.path.join(img_dir, "*.jpg"))
        self.gt_dir = gt_dir
        self.transform = transform
        self.img_size = img_size

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        img = Image.open(img_path).convert('RGB')

        filename = os.path.basename(img_path).replace(".jpg",".mat")
        mat_path = os.path.join(self.gt_dir, "GT_" + filename) 
        
        if not os.path.exists(mat_path):
            raise FileNotFoundError(f"Ground truth file not found: {mat_path}")

        try:
            mat = h5py.File(mat_path, 'r')
            gt = np.array(mat['image_info'][0][0][0][0][0])
            mat.close()
        except OSError:
            mat = loadmat(mat_path)
            gt = np.array(mat['image_info'][0, 0]['location'])

        gt = gt.flatten()

        if gt.size == 0:
            gt = np.empty((0, 2), dtype=np.int32)
        elif gt.size % 2 != 0:
            gt = np.empty((0, 2), dtype=np.int32)
        else:
            gt = gt.reshape(-1, 2).astype(np.int32)

        h, w = img.size[1], img.size[0]

        k = np.zeros((h, w))
        for i in range(len(gt)):
            x, y = gt[i][0], gt[i][1]
            if y < h and x < w:
                k[y, x] = 1

        density = gaussian_filter_density(k)

        img = img.resize(self.img_size)
        
        scale_factor = (self.img_size[0] * self.img_size[1]) / (h * w)
        density = cv2.resize(density, (self.img_size[1], self.img_size[0]), interpolation=cv2.INTER_LINEAR)
        density = density * scale_factor

        if self.transform:
            img = self.transform(img)

        density = torch.from_numpy(density).unsqueeze(0).float()
        return img, density

class CSRNet(nn.Module):
    def __init__(self):
        super(CSRNet, self).__init__()
        vgg = models.vgg16_bn(weights=models.VGG16_BN_Weights.DEFAULT)
        self.frontend = nn.Sequential(*list(vgg.features.children())[:33]) 
        
        self.backend = nn.Sequential(
            nn.Conv2d(512, 512, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 256, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 128, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 64, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True)
        )
        self.output_layer = nn.Conv2d(64, 1, 1)

    def forward(self, x):
        x = self.frontend(x)
        x = self.backend(x)
        x = self.output_layer(x)
        return x

def train_model(num_epochs=5, batch_size=4, learning_rate=1e-5, save_every=5):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])

    train_dataset = CrowdDataset(TRAIN_IMG_DIR, TRAIN_GT_DIR, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)

    model = CSRNet().to(DEVICE)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

    print(f"Training for {num_epochs} epochs...")
    best_loss = float('inf')
    
    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0
        batch_count = 0
        
        for imgs, densities in train_loader:
            imgs, densities = imgs.to(DEVICE), densities.to(DEVICE)
            
            target_densities_downsampled = F.interpolate(
                densities, 
                size=(OUTPUT_SIZE, OUTPUT_SIZE), 
                mode='bilinear', 
                align_corners=False
            )
            
            target_densities_downsampled = target_densities_downsampled * (DOWNSAMPLE_FACTOR ** 2)
            
            optimizer.zero_grad()
            outputs = model(imgs)
            
            loss = criterion(outputs, target_densities_downsampled) 
            
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            
            epoch_loss += loss.item()
            batch_count += 1
            
        avg_loss = epoch_loss / batch_count
        scheduler.step()
        
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.6f}, LR: {scheduler.get_last_lr()[0]:.2e}")
        
        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(model.state_dict(), "csrnet_partA_best.pth")
            print(f"Best model saved with loss: {best_loss:.6f}")
        
        if (epoch + 1) % save_every == 0:
            torch.save(model.state_dict(), f"csrnet_partA_epoch_{epoch+1}.pth")

    torch.save(model.state_dict(), "csrnet_partA_final.pth")
    print("Final model saved to csrnet_partA_final.pth")
    return model

def evaluate_model(model):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])
    
    val_dataset = CrowdDataset(TEST_IMG_DIR, TEST_GT_DIR, transform=transform)
    val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, num_workers=0)

    print("Evaluating model...")
    mae, rmse, n = 0, 0, 0
    
    with torch.no_grad():
        for imgs, densities in val_loader:
            imgs, densities = imgs.to(DEVICE), densities.to(DEVICE)
            outputs = model(imgs)
            
            predicted_count = outputs.sum().item()
            actual_count = densities.sum().item()
            
            mae += abs(predicted_count - actual_count)
            rmse += (predicted_count - actual_count)**2
            n += 1
    
    mae /= n
    rmse = np.sqrt(rmse/n)
    
    print(f"MAE: {mae:.2f}, RMSE: {rmse:.2f}")
    return mae, rmse

def alert_system(model, img_path, crowd_limit=50):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])
    
    img = Image.open(img_path).convert('RGB')
    img_resized = img.resize((IMG_WIDTH, IMG_HEIGHT))
    img_t = transform(img_resized).unsqueeze(0).to(DEVICE)
    
    model.eval()
    with torch.no_grad():
        output = model(img_t)
        
    count = int(output.sum().item())
    
    if count > crowd_limit:
        print(f"🚨 ALERT: Overcrowded! Count: {count} (Limit: {crowd_limit})")
        return True, count
    else:
        print(f"✅ Normal crowd level. Count: {count} (Limit: {crowd_limit})")
        return False, count

def process_images_with_alerts(model, img_dir, crowd_limit=50):
    image_files = glob.glob(os.path.join(img_dir, "*.jpg"))
    alert_count = 0
    total_images = len(image_files)
    
    if total_images == 0:
        print(f"No images found in {img_dir}")
        return
    
    for img_path in image_files:
        filename = os.path.basename(img_path)
        is_alert, count = alert_system(model, img_path, crowd_limit)
        status = "🚨 ALERT" if is_alert else "✅ NORMAL"
        print(f"{filename:<20} | Count: {count:>4} | {status}")
        if is_alert:
            alert_count += 1
    
    print(f"Summary: {alert_count}/{total_images} images triggered alerts")
    print(f"Alert percentage: {(alert_count/total_images)*100:.1f}%")

def load_model(model_path="csrnet_partA_best.pth"):
    model = CSRNet().to(DEVICE)
    if os.path.exists(model_path):
        model.load_state_dict(torch.load(model_path, map_location=DEVICE))
        print(f"Loaded model from {model_path}")
        return model, True
    return model, False

def main():
    model, model_loaded = load_model()
    
    if not model_loaded:
        print("Training new model...")
        model = train_model(num_epochs=5, batch_size=4, learning_rate=1e-5)
    
    evaluate_model(model)
    
    if os.path.exists(TEST_IMG_DIR):
        process_images_with_alerts(model, TEST_IMG_DIR, crowd_limit=30)
    
    return model

if __name__ == "__main__":
    model = main()


Device: cpu
Training new model...
Training for 5 epochs...
Epoch 1/5, Loss: 0.005751, LR: 1.00e-05
Best model saved with loss: 0.005751
Epoch 2/5, Loss: 0.000604, LR: 1.00e-05
Best model saved with loss: 0.000604
Epoch 3/5, Loss: 0.000232, LR: 1.00e-05
Best model saved with loss: 0.000232
Epoch 4/5, Loss: 0.000103, LR: 1.00e-05
Best model saved with loss: 0.000103
Epoch 5/5, Loss: 0.000075, LR: 5.00e-06
Best model saved with loss: 0.000075
Final model saved to csrnet_partA_final.pth
Evaluating model...
MAE: 1.78, RMSE: 2.21
✅ Normal crowd level. Count: -409 (Limit: 30)
IMG_1.jpg            | Count: -409 | ✅ NORMAL
✅ Normal crowd level. Count: -4 (Limit: 30)
IMG_10.jpg           | Count:   -4 | ✅ NORMAL
✅ Normal crowd level. Count: -1 (Limit: 30)
IMG_100.jpg          | Count:   -1 | ✅ NORMAL
✅ Normal crowd level. Count: -12 (Limit: 30)
IMG_101.jpg          | Count:  -12 | ✅ NORMAL
✅ Normal crowd level. Count: 0 (Limit: 30)
IMG_102.jpg          | Count:    0 | ✅ NORMAL
✅ Normal crowd lev

In [17]:
import os
import glob
import h5py
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image
import matplotlib.pyplot as plt
import cv2
from scipy.io import loadmat
import torch.nn.functional as F

# -------------------------------
# Config
# -------------------------------
torch.manual_seed(42)
np.random.seed(42)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", DEVICE)

TRAIN_IMG_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\train_data\images"
TRAIN_GT_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\train_data\ground-truth"
TEST_IMG_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\test_data\images"
TEST_GT_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\test_data\ground-truth"

IMG_HEIGHT = 256
IMG_WIDTH = 256
DOWNSAMPLE_FACTOR = 8
OUTPUT_SIZE = IMG_HEIGHT // DOWNSAMPLE_FACTOR

# -------------------------------
# Gaussian Density Map
# -------------------------------
def gaussian_filter_density(gt):
    density = np.zeros(gt.shape, dtype=np.float32)
    pts = np.array(list(zip(np.nonzero(gt)[1], np.nonzero(gt)[0])))
    if len(pts) == 0:
        return density
    
    sigma = 15
    for i in range(len(pts)):
        pt2d = np.zeros(gt.shape, dtype=np.float32)
        y, x = pts[i][1], pts[i][0]
        if y < gt.shape[0] and x < gt.shape[1]:
            pt2d[y, x] = 1.
        density += cv2.GaussianBlur(pt2d, (0,0), sigma, borderType=cv2.BORDER_CONSTANT)
    return density

# -------------------------------
# Dataset
# -------------------------------
class CrowdDataset(Dataset):
    def __init__(self, img_dir, gt_dir, transform=None, img_size=(IMG_HEIGHT, IMG_WIDTH)):
        self.img_paths = glob.glob(os.path.join(img_dir, "*.jpg"))
        self.gt_dir = gt_dir
        self.transform = transform
        self.img_size = img_size

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        img = Image.open(img_path).convert('RGB')

        filename = os.path.basename(img_path).replace(".jpg",".mat")
        mat_path = os.path.join(self.gt_dir, "GT_" + filename) 
        
        if not os.path.exists(mat_path):
            raise FileNotFoundError(f"Ground truth file not found: {mat_path}")

        try:
            mat = h5py.File(mat_path, 'r')
            gt = np.array(mat['image_info'][0][0][0][0][0])
            mat.close()
        except OSError:
            mat = loadmat(mat_path)
            gt = np.array(mat['image_info'][0, 0]['location'])

        gt = gt.flatten()
        if gt.size == 0:
            gt = np.empty((0, 2), dtype=np.int32)
        elif gt.size % 2 != 0:
            gt = np.empty((0, 2), dtype=np.int32)
        else:
            gt = gt.reshape(-1, 2).astype(np.int32)

        h, w = img.size[1], img.size[0]

        k = np.zeros((h, w))
        for i in range(len(gt)):
            x, y = gt[i][0], gt[i][1]
            if y < h and x < w:
                k[y, x] = 1

        density = gaussian_filter_density(k)
        img = img.resize(self.img_size)
        
        scale_factor = (self.img_size[0] * self.img_size[1]) / (h * w)
        density = cv2.resize(density, (self.img_size[1], self.img_size[0]), interpolation=cv2.INTER_LINEAR)
        density = density * scale_factor

        if self.transform:
            img = self.transform(img)

        density = torch.from_numpy(density).unsqueeze(0).float()
        return img, density

# -------------------------------
# CSRNet Model
# -------------------------------
class CSRNet(nn.Module):
    def __init__(self):
        super(CSRNet, self).__init__()
        vgg = models.vgg16_bn(weights=models.VGG16_BN_Weights.DEFAULT)
        self.frontend = nn.Sequential(*list(vgg.features.children())[:33]) 
        
        self.backend = nn.Sequential(
            nn.Conv2d(512, 512, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 256, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 128, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 64, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True)
        )
        self.output_layer = nn.Conv2d(64, 1, 1)

    def forward(self, x):
        x = self.frontend(x)
        x = self.backend(x)
        x = self.output_layer(x)
        return x

# -------------------------------
# Training
# -------------------------------
def train_model(num_epochs=5, batch_size=4, learning_rate=1e-5, save_every=5):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])

    train_dataset = CrowdDataset(TRAIN_IMG_DIR, TRAIN_GT_DIR, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)

    model = CSRNet().to(DEVICE)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

    print(f"Training for {num_epochs} epochs...")
    best_loss = float('inf')
    
    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0
        batch_count = 0
        
        for imgs, densities in train_loader:
            imgs, densities = imgs.to(DEVICE), densities.to(DEVICE)
            
            target_densities_downsampled = F.interpolate(
                densities, 
                size=(OUTPUT_SIZE, OUTPUT_SIZE), 
                mode='bilinear', 
                align_corners=False
            )
            target_densities_downsampled = target_densities_downsampled * (DOWNSAMPLE_FACTOR ** 2)
            
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, target_densities_downsampled) 
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            
            epoch_loss += loss.item()
            batch_count += 1
            
        avg_loss = epoch_loss / batch_count
        scheduler.step()
        
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.6f}, LR: {scheduler.get_last_lr()[0]:.2e}")
        
        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(model.state_dict(), "csrnet_partA_best.pth")
            print(f"Best model saved with loss: {best_loss:.6f}")
        
        if (epoch + 1) % save_every == 0:
            torch.save(model.state_dict(), f"csrnet_partA_epoch_{epoch+1}.pth")

    torch.save(model.state_dict(), "csrnet_partA_final.pth")
    print("Final model saved to csrnet_partA_final.pth")
    return model

# -------------------------------
# Evaluation
# -------------------------------
def evaluate_model(model):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])
    
    val_dataset = CrowdDataset(TEST_IMG_DIR, TEST_GT_DIR, transform=transform)
    val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, num_workers=0)

    print("Evaluating model...")
    mae, rmse, n = 0, 0, 0
    
    with torch.no_grad():
        for imgs, densities in val_loader:
            imgs, densities = imgs.to(DEVICE), densities.to(DEVICE)
            outputs = model(imgs)
            
            predicted_count = max(outputs.sum().item(), 0)  # ensure non-negative
            actual_count = densities.sum().item()
            
            mae += abs(predicted_count - actual_count)
            rmse += (predicted_count - actual_count)**2
            n += 1
    
    mae /= n
    rmse = np.sqrt(rmse/n)
    
    print(f"MAE: {mae:.2f}, RMSE: {rmse:.2f}")
    return mae, rmse

# -------------------------------
# Alert System
# -------------------------------
def alert_system(model, img_path, crowd_limit=50):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])
    
    img = Image.open(img_path).convert('RGB')
    img_resized = img.resize((IMG_WIDTH, IMG_HEIGHT))
    img_t = transform(img_resized).unsqueeze(0).to(DEVICE)
    
    model.eval()
    with torch.no_grad():
        output = model(img_t)
        
    count = max(int(output.sum().item()), 0)  # avoid negative counts
    
    if count > crowd_limit:
        print(f"🚨 ALERT: Overcrowded! Count: {count} (Limit: {crowd_limit})")
        return True, count
    else:
        print(f"✅ Normal crowd level. Count: {count} (Limit: {crowd_limit})")
        return False, count

def process_images_with_alerts(model, img_dir, crowd_limit=50):
    image_files = glob.glob(os.path.join(img_dir, "*.jpg"))
    alert_count = 0
    total_images = len(image_files)
    
    if total_images == 0:
        print(f"No images found in {img_dir}")
        return
    
    for img_path in image_files:
        filename = os.path.basename(img_path)
        is_alert, count = alert_system(model, img_path, crowd_limit)
        status = "🚨 ALERT" if is_alert else "✅ NORMAL"
        print(f"{filename:<20} | Count: {count:>4} | {status}")
        if is_alert:
            alert_count += 1
    
    print(f"Summary: {alert_count}/{total_images} images triggered alerts")
    print(f"Alert percentage: {(alert_count/total_images)*100:.1f}%")

# -------------------------------
# Load Model
# -------------------------------
def load_model(model_path="csrnet_partA_best.pth"):
    model = CSRNet().to(DEVICE)
    if os.path.exists(model_path):
        state_dict = torch.load(model_path, map_location=DEVICE)
        model.load_state_dict(state_dict, strict=False)  # <--- FIX HERE
        print(f"Loaded model from {model_path} (strict=False)")
        return model, True
    return model, False

# -------------------------------
# Main
# -------------------------------
def main():
    model, model_loaded = load_model()
    
    if not model_loaded:
        print("Training new model...")
        model = train_model(num_epochs=5, batch_size=4, learning_rate=1e-5)
    
    evaluate_model(model)
    
    if os.path.exists(TEST_IMG_DIR):
        process_images_with_alerts(model, TEST_IMG_DIR, crowd_limit=30)
    
    return model

if __name__ == "__main__":
    model = main()


Device: cpu
Loaded model from csrnet_partA_best.pth (strict=False)
Evaluating model...
MAE: 0.48, RMSE: 1.17
✅ Normal crowd level. Count: 0 (Limit: 30)
IMG_1.jpg            | Count:    0 | ✅ NORMAL
✅ Normal crowd level. Count: 0 (Limit: 30)
IMG_10.jpg           | Count:    0 | ✅ NORMAL
✅ Normal crowd level. Count: 0 (Limit: 30)
IMG_100.jpg          | Count:    0 | ✅ NORMAL
✅ Normal crowd level. Count: 0 (Limit: 30)
IMG_101.jpg          | Count:    0 | ✅ NORMAL
✅ Normal crowd level. Count: 0 (Limit: 30)
IMG_102.jpg          | Count:    0 | ✅ NORMAL
✅ Normal crowd level. Count: 0 (Limit: 30)
IMG_103.jpg          | Count:    0 | ✅ NORMAL
✅ Normal crowd level. Count: 0 (Limit: 30)
IMG_104.jpg          | Count:    0 | ✅ NORMAL
✅ Normal crowd level. Count: 0 (Limit: 30)
IMG_105.jpg          | Count:    0 | ✅ NORMAL
✅ Normal crowd level. Count: 0 (Limit: 30)
IMG_106.jpg          | Count:    0 | ✅ NORMAL
✅ Normal crowd level. Count: 0 (Limit: 30)
IMG_107.jpg          | Count:    0 | ✅ NORMAL
✅

In [2]:
import os
import glob
import h5py
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image
import matplotlib.pyplot as plt
import cv2
from scipy.io import loadmat
import torch.nn.functional as F

# -------------------------------
# Config
# -------------------------------
torch.manual_seed(42)
np.random.seed(42)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", DEVICE)

TRAIN_IMG_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\train_data\images"
TRAIN_GT_DIR = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\train_data\ground-truth"
TEST_IMG_DIR  = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\test_data\images"
TEST_GT_DIR   = r"D:\Task For Infosys Internship\archive\ShanghaiTech\part_A\test_data\ground-truth"

IMG_HEIGHT = 256
IMG_WIDTH = 256
DOWNSAMPLE_FACTOR = 8
OUTPUT_SIZE = IMG_HEIGHT // DOWNSAMPLE_FACTOR

# -------------------------------
# Gaussian Density Map Generator
# -------------------------------
def gaussian_filter_density(gt):
    density = np.zeros(gt.shape, dtype=np.float32)
    pts = np.array(list(zip(np.nonzero(gt)[1], np.nonzero(gt)[0])))
    if len(pts) == 0:
        return density
    sigma = 15
    for i in range(len(pts)):
        pt2d = np.zeros(gt.shape, dtype=np.float32)
        y, x = pts[i][1], pts[i][0]
        if y < gt.shape[0] and x < gt.shape[1]:
            pt2d[y, x] = 1.
        density += cv2.GaussianBlur(pt2d, (0,0), sigma, borderType=cv2.BORDER_CONSTANT)
    return density

# -------------------------------
# Dataset
# -------------------------------
class CrowdDataset(Dataset):
    def __init__(self, img_dir, gt_dir, transform=None, img_size=(IMG_HEIGHT, IMG_WIDTH)):
        self.img_paths = glob.glob(os.path.join(img_dir, "*.jpg"))
        self.gt_dir = gt_dir
        self.transform = transform
        self.img_size = img_size
    def __len__(self):
        return len(self.img_paths)
    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        img = Image.open(img_path).convert('RGB')
        filename_mat = os.path.basename(img_path).replace(".jpg", ".mat")
        filename_h5  = os.path.basename(img_path).replace(".jpg", ".h5")
        mat_path     = os.path.join(self.gt_dir, "GT_" + filename_mat)
        h5_path      = os.path.join(self.gt_dir, "GT_" + filename_h5)
        
        # Try both .mat (original annotation) and .h5 (density map) file naming
        if os.path.exists(h5_path):
            with h5py.File(h5_path, 'r') as hf:
                # Typical structure: 'density'
                density = np.asarray(hf['density'])
                img = img.resize(self.img_size)
                if self.transform:
                    img = self.transform(img)
                density = cv2.resize(density, self.img_size, interpolation=cv2.INTER_LINEAR)
                density = torch.from_numpy(density).unsqueeze(0).float()
                return img, density
        elif os.path.exists(mat_path):
            try:
                mat = loadmat(mat_path)
                points = mat['image_info'][0,0]['location'][0,0]
            except NotImplementedError:
                with h5py.File(mat_path, 'r') as f:
                    points = np.array(f['image_info'][0,0][0,0][0])
            if points.size == 0:
                coords = np.empty((0, 2), dtype=np.int32)
            else:
                coords = np.array(points)
            img = img.resize(self.img_size)
            h, w = img.size[1], img.size[0]
            k = np.zeros((h, w))
            for i in range(coords.shape[0]):
                x = min(int(coords[i][0]), w-1)
                y = min(int(coords[i][1]), h-1)
                if y < h and x < w:
                    k[y, x] = 1
            density = gaussian_filter_density(k)
            scale_factor = (self.img_size[0] * self.img_size[1]) / (h * w)
            density = cv2.resize(density, self.img_size, interpolation=cv2.INTER_LINEAR)
            density = density * scale_factor
            if self.transform:
                img = self.transform(img)
            density = torch.from_numpy(density).unsqueeze(0).float()
            return img, density
        else:
            raise FileNotFoundError(f"Ground truth file not found: {mat_path} or {h5_path}")

# -------------------------------
# CSRNet Model
# -------------------------------
class CSRNet(nn.Module):
    def __init__(self):
        super(CSRNet, self).__init__()
        vgg = models.vgg16_bn(weights=models.VGG16_BN_Weights.DEFAULT)
        self.frontend = nn.Sequential(*list(vgg.features.children())[:33]) 
        self.backend = nn.Sequential(
            nn.Conv2d(512, 512, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 256, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 128, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 64, 3, padding=2, dilation=2),
            nn.ReLU(inplace=True)
        )
        self.output_layer = nn.Conv2d(64, 1, kernel_size=1)
    def forward(self, x):
        x = self.frontend(x)
        x = self.backend(x)
        x = self.output_layer(x)
        return x

# -------------------------------
# Training
# -------------------------------
def train_model(num_epochs=5, batch_size=4, learning_rate=1e-5, save_every=5):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])
    train_dataset = CrowdDataset(TRAIN_IMG_DIR, TRAIN_GT_DIR, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    model = CSRNet().to(DEVICE)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
    print(f"Training for {num_epochs} epochs...")
    best_loss = float('inf')
    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0
        batch_count = 0
        for imgs, densities in train_loader:
            imgs, densities = imgs.to(DEVICE), densities.to(DEVICE)
            target_densities_downsampled = F.interpolate(
                densities, 
                size=(OUTPUT_SIZE, OUTPUT_SIZE), 
                mode='bilinear', 
                align_corners=False
            )
            target_densities_downsampled = target_densities_downsampled * (DOWNSAMPLE_FACTOR ** 2)
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, target_densities_downsampled) 
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            epoch_loss += loss.item()
            batch_count += 1
        avg_loss = epoch_loss / batch_count
        scheduler.step()
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.6f}, LR: {scheduler.get_last_lr()[0]:.2e}")
        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(model.state_dict(), "csrnet_partA_best.pth")
            print(f"Best model saved with loss: {best_loss:.6f}")
        if (epoch + 1) % save_every == 0:
            torch.save(model.state_dict(), f"csrnet_partA_epoch_{epoch+1}.pth")
    torch.save(model.state_dict(), "csrnet_partA_final.pth")
    print("Final model saved to csrnet_partA_final.pth")
    return model

# -------------------------------
# Evaluation
# -------------------------------
def evaluate_model(model):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])
    val_dataset = CrowdDataset(TEST_IMG_DIR, TEST_GT_DIR, transform=transform)
    val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, num_workers=0)
    print("Evaluating model...")
    mae, rmse, n = 0, 0, 0
    with torch.no_grad():
        for imgs, densities in val_loader:
            imgs, densities = imgs.to(DEVICE), densities.to(DEVICE)
            outputs = model(imgs)
            predicted_count = max(outputs.sum().item(), 0)
            actual_count = densities.sum().item()
            mae += abs(predicted_count - actual_count)
            rmse += (predicted_count - actual_count) ** 2
            n += 1
    mae /= n
    rmse = np.sqrt(rmse / n)
    print(f"MAE: {mae:.2f}, RMSE: {rmse:.2f}")
    return mae, rmse

# -------------------------------
# Alert System
# -------------------------------
def alert_system(model, img_path, crowd_limit=50):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])
    img = Image.open(img_path).convert('RGB')
    img_resized = img.resize((IMG_WIDTH, IMG_HEIGHT))
    img_t = transform(img_resized).unsqueeze(0).to(DEVICE)
    model.eval()
    with torch.no_grad():
        output = model(img_t)
    count = max(int(output.sum().item()), 0)
    if count > crowd_limit:
        print(f"🚨 ALERT: Overcrowded! Count: {count} (Limit: {crowd_limit})")
        return True, count
    else:
        print(f"✅ Normal crowd level. Count: {count} (Limit: {crowd_limit})")
        return False, count

def process_images_with_alerts(model, img_dir, crowd_limit=50):
    image_files = glob.glob(os.path.join(img_dir, "*.jpg"))
    alert_count = 0
    total_images = len(image_files)
    if total_images == 0:
        print(f"No images found in {img_dir}")
        return
    for img_path in image_files:
        filename = os.path.basename(img_path)
        is_alert, count = alert_system(model, img_path, crowd_limit)
        status = "🚨 ALERT" if is_alert else "✅ NORMAL"
        print(f"{filename:<20} | Count: {count:>4} | {status}")
        if is_alert:
            alert_count += 1
    print(f"Summary: {alert_count}/{total_images} images triggered alerts")
    print(f"Alert percentage: {(alert_count/total_images)*100:.1f}%")

# -------------------------------
# Load Model
# -------------------------------
def load_model(model_path="csrnet_partA_best.pth"):
    model = CSRNet().to(DEVICE)
    if os.path.exists(model_path):
        state_dict = torch.load(model_path, map_location=DEVICE)
        model.load_state_dict(state_dict, strict=False)
        print(f"Loaded model from {model_path} (strict=False)")
        return model, True
    return model, False

# -------------------------------
# Main
# -------------------------------
def main():
    model, model_loaded = load_model()
    if not model_loaded:
        print("Training new model...")
        model = train_model(num_epochs=5, batch_size=4, learning_rate=1e-5)
    evaluate_model(model)
    if os.path.exists(TEST_IMG_DIR):
        process_images_with_alerts(model, TEST_IMG_DIR, crowd_limit=30)
    return model

if __name__ == "__main__":
    model = main()


Device: cpu
Training new model...
Training for 5 epochs...
Epoch 1/5, Loss: 0.061510, LR: 1.00e-05
Best model saved with loss: 0.061510
Epoch 2/5, Loss: 0.053974, LR: 1.00e-05
Best model saved with loss: 0.053974
Epoch 3/5, Loss: 0.042215, LR: 1.00e-05
Best model saved with loss: 0.042215
Epoch 4/5, Loss: 0.040385, LR: 1.00e-05
Best model saved with loss: 0.040385
Epoch 5/5, Loss: 0.037369, LR: 5.00e-06
Best model saved with loss: 0.037369
Final model saved to csrnet_partA_final.pth
Evaluating model...
MAE: 118.39, RMSE: 133.38
🚨 ALERT: Overcrowded! Count: 224 (Limit: 30)
IMG_1.jpg            | Count:  224 | 🚨 ALERT
🚨 ALERT: Overcrowded! Count: 240 (Limit: 30)
IMG_10.jpg           | Count:  240 | 🚨 ALERT
🚨 ALERT: Overcrowded! Count: 302 (Limit: 30)
IMG_100.jpg          | Count:  302 | 🚨 ALERT
🚨 ALERT: Overcrowded! Count: 205 (Limit: 30)
IMG_101.jpg          | Count:  205 | 🚨 ALERT
🚨 ALERT: Overcrowded! Count: 271 (Limit: 30)
IMG_102.jpg          | Count:  271 | 🚨 ALERT
🚨 ALERT: Overcro