In [None]:
pip install facenet_pytorch

### Importing Neccessary Libraries

In [None]:
import os
import json
import copy
import torch
import time
import random
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
from PIL import Image, ImageFile
from facenet_pytorch import MTCNN
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader,random_split
from scipy.spatial.distance import pdist, squareform
from torch.utils.data import Subset, DataLoader
from torchvision import transforms

In [None]:
# Allow loading of truncated images to avoid errors with corrupted files
ImageFile.LOAD_TRUNCATED_IMAGES = True

In [None]:
def generate_pseudo_labels(image_dir, output_json):
    """
    Generate pseudo-labels for head counting using MTCNN for face detection.
    Detected bounding boxes are saved as pseudo-labels in a JSON file.
    
    Args:
        image_dir (str): Directory containing the images.
        output_json (str): Output JSON file path to save pseudo-labels.
    """
    # Initialize the MTCNN model for detecting multiple faces per image.
    mtcnn = MTCNN(keep_all=True)
    pseudo_labels = {}

    # Process each image in the specified directory.
    for img_file in os.listdir(image_dir):
        if img_file.lower().endswith(('.jpg', '.jpeg', '.png')):
            img_path = os.path.join(image_dir, img_file)
            try:
                image = Image.open(img_path).convert('RGB')
            except OSError as e:
                print(f"Error loading image {img_file}: {e}. Skipping this file.")
                continue

            # Detect faces (as proxy for heads)
            boxes, probs = mtcnn.detect(image)

            # Convert detected boxes to integer coordinates if detections are present.
            if boxes is not None:
                boxes = boxes.tolist()
                boxes = [[int(x) for x in box] for box in boxes]
            else:
                boxes = []

            pseudo_labels[img_file] = boxes
            print(f"Processed {img_file} - Detected {len(boxes)} heads.")
    
    # Save the pseudo-labels to a JSON file.
    with open(output_json, 'w') as f:
        json.dump(pseudo_labels, f, indent=4)
    print(f"Pseudo-labels saved to {output_json}")

In [None]:
def visualize_detection(image_path, boxes):
    """
    Visualize the detections on a single image.
    
    Args:
        image_path (str): Path to the image.
        boxes (list): List of bounding boxes [x1, y1, x2, y2].
    """
    image = Image.open(image_path).convert('RGB')
    fig, ax = plt.subplots(1)
    ax.imshow(image)
    for box in boxes:
        x1, y1, x2, y2 = box
        rect = patches.Rectangle((x1, y1), x2-x1, y2-y1, linewidth=2, edgecolor='r', facecolor='none')
        ax.add_patch(rect)
    plt.title("Detected Heads")
    plt.show()


In [None]:
if __name__ == "__main__":
    # Dataset path for the training data.
    image_dir = "/kaggle/input/crowd-data/Train Data"
    output_json = "/kaggle/working/pseudo_labels.json"
    
    # Generate pseudo-labels using weak supervision (pseudo labeling).
    generate_pseudo_labels(image_dir, output_json)
    
    # visualize detection for one sample image:
    with open(output_json, 'r') as f:
        pseudo_labels = json.load(f)
        for image_file, boxes in pseudo_labels.items():
            image_path = os.path.join(image_dir, image_file)
            visualize_detection(image_path, boxes)
            break  # Visualize only the first image.

In [None]:
class HeadCountingDataset(Dataset):
    """
    Custom dataset that maps each image to its head count.
    The head count is derived from the number of pseudo-label bounding boxes.
    """
    def __init__(self, image_dir, pseudo_labels_path, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        with open(pseudo_labels_path, 'r') as f:
            self.pseudo_labels = json.load(f)
        self.image_files = list(self.pseudo_labels.keys())
    
    def __len__(self):
        return len(self.image_files)
    
    def __getitem__(self, idx):
        image_file = self.image_files[idx]
        image_path = os.path.join(self.image_dir, image_file)
        image = Image.open(image_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        # The head count is the number of bounding boxes detected.
        count = len(self.pseudo_labels[image_file])
        return image, torch.tensor([count], dtype=torch.float)

Data Transformation

In [None]:
# image transformations (resize and normalize for pre-trained models)
data_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

### Data Loading

In [None]:
# Dataset directory and pseudo labels file (using Kaggle path)
image_dir = "/kaggle/input/crowd-data/Train Data"
pseudo_labels_path = "/kaggle/working/pseudo_labels.json"

# Dataset instance
dataset = HeadCountingDataset(image_dir, pseudo_labels_path, transform=data_transforms)

# Split the dataset: 70% train, 10% validation, 20% test 
dataset_length = len(dataset)
train_size = int(0.7 * dataset_length)
val_size = int(0.1 * dataset_length)
test_size = dataset_length - train_size - val_size

print(f"Total images: {dataset_length}")
print(f"Train size: {train_size}, Validation size: {val_size}, Test size: {test_size}")

# For training comparative models, we use train and validation sets.
train_dataset, val_dataset,test_dataset  = random_split(dataset, [train_size, val_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

dataloaders = {'train': train_loader, 'val': val_loader}

# Create DataLoaders for each subset
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

### Model Transformation

In [None]:
def get_model(model_name='vgg16'):
    """
    Loads a pre-trained CNN model and adapts it for regression (head count prediction).
    
    Args:
        model_name (str): Choose 'vgg16' or 'resnet50'.
    
    Returns:
        model: Modified pre-trained model.
    """
    if model_name == 'vgg16':
        model = models.vgg16(pretrained=True)
        # Replace the last classifier layer for regression (output 1 value)
        model.classifier[6] = nn.Linear(in_features=4096, out_features=1)
    elif model_name == 'resnet50':
        model = models.resnet50(pretrained=True)
        # Replace the final fully connected layer for regression
        model.fc = nn.Linear(in_features=model.fc.in_features, out_features=1)
    return model

In [None]:
def train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs, device):
    """
    Train the model and record training and validation losses.
    
    Returns:
        model: Best model based on validation loss.
        train_losses: List of average training loss per epoch.
        val_losses: List of average validation loss per epoch.
    """
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')
    
    train_losses = []
    val_losses = []
    
    for epoch in range(num_epochs):
        print(f"Epoch [{epoch+1}/{num_epochs}]")
        print("-" * 20)
        
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()
                
            running_loss = 0.0
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                running_loss += loss.item() * inputs.size(0)
            
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            print(f"{phase} Loss: {epoch_loss:.4f}")
            
            if phase == 'train':
                train_losses.append(epoch_loss)
            else:
                val_losses.append(epoch_loss)
                scheduler.step(epoch_loss)
            
            if phase == 'val' and epoch_loss < best_loss:
                best_loss = epoch_loss
                best_model_wts = copy.deepcopy(model.state_dict())
                
        print()
    
    time_elapsed = time.time() - since
    print(f"Training complete in {time_elapsed//60:.0f}m {time_elapsed%60:.0f}s")
    print(f"Best Validation Loss: {best_loss:.4f}")
    
    model.load_state_dict(best_model_wts)
    return model, train_losses, val_losses

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_epochs = 10
criterion = nn.MSELoss()

### VGG16 Training

In [None]:
# ---- VGG16 Training ----
print("Training VGG16 Model")
vgg_model = get_model("vgg16").to(device)
optimizer_vgg = optim.Adam(vgg_model.parameters(), lr=1e-4)
scheduler_vgg = optim.lr_scheduler.ReduceLROnPlateau(optimizer_vgg, mode='min', factor=0.1, patience=2)
vgg_model, vgg_train_losses, vgg_val_losses = train_model(vgg_model, dataloaders, criterion, optimizer_vgg, scheduler_vgg, num_epochs, device)

### ResNet-50 Training

In [None]:
# ---- ResNet50 Training ----
print("\nTraining ResNet50 Model")
resnet_model = get_model("resnet50").to(device)
optimizer_resnet = optim.SGD(resnet_model.parameters(), lr=5e-4)
scheduler_resnet = optim.lr_scheduler.ReduceLROnPlateau(optimizer_resnet, mode='min', factor=0.1, patience=2)
resnet_model, resnet_train_losses, resnet_val_losses = train_model(resnet_model, dataloaders, criterion, optimizer_resnet, scheduler_resnet, num_epochs, device)

### Visualization of Loss Curves

#### VGG16

In [None]:
epochs = range(1, num_epochs + 1)

plt.figure(figsize=(12, 5))

# VGG16 Loss Curves
plt.subplot(1, 2, 1)
plt.plot(epochs, vgg_train_losses, 'b-o', label='VGG16 Train Loss')
plt.plot(epochs, vgg_val_losses, 'r-o', label='VGG16 Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('VGG16 Loss Curves')
plt.legend()



#### Resnet50

In [None]:
# ResNet50 Loss Curves
plt.subplot(1, 2, 2)
plt.plot(epochs, resnet_train_losses, 'b-o', label='ResNet50 Train Loss')
plt.plot(epochs, resnet_val_losses, 'r-o', label='ResNet50 Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('ResNet50 Loss Curves')
plt.legend()

plt.tight_layout()
plt.show()

### Descriptive Statistics about our Image Data

In [None]:
# Load pseudo-labels from JSON
with open(pseudo_labels_path, 'r') as f:
    pseudo_labels = json.load(f)

In [None]:
# Create a list to store extracted features for each image
features_list = []

# Loop over each image and its corresponding pseudo-labels
for image_file, boxes in pseudo_labels.items():
    image_path = os.path.join(image_dir, image_file)
    try:
        # Open the image and convert to RGB
        img = Image.open(image_path).convert('RGB')
    except Exception as e:
        print(f"Error processing {image_file}: {e}")
        continue
    
    # Feature Extraction 
    # 1. Head Count: number of pseudo-label bounding boxes
    head_count = len(boxes)
    
    # 2. Image Dimensions: width and height (in pixels)
    width, height = img.size
    
    # 3. Area: total number of pixels (width x height)
    area = width * height
    
    # 4. Aspect Ratio: width divided by height
    aspect_ratio = width / height if height != 0 else np.nan
    
    # 5. Average Brightness: mean pixel value from grayscale conversion
    img_gray = img.convert('L')
    avg_brightness = np.mean(np.array(img_gray))
    
    # 6. File Size: size of the image file in kilobytes
    try:
        file_size = os.path.getsize(image_path) / 1024  # in KB
    except Exception as e:
        file_size = np.nan

    # Append all features into the list as a dictionary
    features_list.append({
        'image_file': image_file,
        'head_count': head_count,
        'width': width,
        'height': height,
        'area': area,
        'aspect_ratio': aspect_ratio,
        'avg_brightness': avg_brightness,
        'file_size_kb': file_size
    })


In [None]:
# Create a DataFrame from the extracted features
df = pd.DataFrame(features_list)

In [None]:
df.head()

### Descriptive Statistical Analysis

In [None]:
# Compute overall descriptive statistics for numerical features
df.describe()

In [None]:
# Compute and print the mode for each numeric feature
numeric_features = ['head_count', 'width', 'height', 'area', 'aspect_ratio', 'avg_brightness', 'file_size_kb']
print("\nMode for each numeric feature:")
for feature in numeric_features:
    mode_val = df[feature].mode()
    print(f"{feature} mode: {mode_val.values}")

### Graphical Representation

In [None]:
# 3.1 Histograms for Each Feature
plt.figure(figsize=(16, 12))
for i, feature in enumerate(numeric_features, 1):
    plt.subplot(3, 3, i)
    plt.hist(df[feature].dropna(), bins=10, color='skyblue', edgecolor='black')
    plt.title(f"Histogram of {feature}")
    plt.xlabel(feature)
    plt.ylabel("Frequency")
plt.tight_layout()
plt.show()

In [None]:
# 3.2 Line Plot: Head Count Sorted by Image Index
df_sorted = df.sort_values(by='head_count').reset_index(drop=True)
plt.figure(figsize=(10, 5))
plt.plot(df_sorted.index, df_sorted['head_count'], marker='o', linestyle='-')
plt.title("Head Count Sorted by Image Index")
plt.xlabel("Sorted Image Index")
plt.ylabel("Head Count")
plt.show()

In [None]:
# 3.3 Bar Plot: Head Count for Each Image
plt.figure(figsize=(12, 6))
plt.bar(df['image_file'], df['head_count'], color='lightgreen')
plt.xticks(rotation=90)
plt.title("Head Count for Each Image")
plt.xlabel("Image File")
plt.ylabel("Head Count")
plt.tight_layout()
plt.show()

In [None]:
# 3.4 Correlation Analysis: Correlation Matrix and Heatmap
corr_matrix = df[numeric_features].corr()
print("\nCorrelation Matrix:")
corr_matrix

In [None]:
plt.figure(figsize=(8, 6))
plt.imshow(corr_matrix, cmap='viridis', interpolation='none')
plt.colorbar()
plt.xticks(range(len(numeric_features)), numeric_features, rotation=90)
plt.yticks(range(len(numeric_features)), numeric_features)
plt.title("Correlation Matrix Heatmap")
plt.show()

# 3.5 Scatter Plots: Head Count vs. Each Other Feature
for feature in numeric_features:
    if feature != 'head_count':
        plt.figure(figsize=(6, 4))
        plt.scatter(df[feature], df['head_count'], alpha=0.7, color='coral')
        plt.xlabel(feature)
        plt.ylabel("Head Count")
        plt.title(f"Head Count vs. {feature}")
        plt.show()

In [None]:
# 3.6 Distance Matrix: Compute and Visualize the Euclidean Distance Matrix
distance_matrix = squareform(pdist(df[numeric_features], metric='euclidean'))
print("\nDistance Matrix (first 5 rows and columns):")
print(distance_matrix[:5, :5])

In [None]:
# Sorting the data by head count (descending order)
df_sorted_by_head = df.sort_values(by='head_count', ascending=False)
print("\nData sorted by head_count (top 5):")
df_sorted_by_head.head()

### Testing on Augmented Dataset

In [None]:
# 1. Create a random subset (30% of the images)
dataset_length = len(dataset)
subset_size = int(0.3 * dataset_length)
subset_indices = random.sample(range(dataset_length), subset_size)

In [None]:
# 2. Define augmentation transformations to simulate varying conditions
class EnsureTensor(object):
    def __call__(self, img):
        if not isinstance(img, torch.Tensor):
            return transforms.ToTensor()(img)
        return img

### Transformations

In [None]:
augmented_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),
    transforms.RandomRotation(degrees=15),
    EnsureTensor(),  # This ensures the image is a tensor
    transforms.RandomErasing(p=0.5, scale=(0.02, 0.15)),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

### Augmented Image Result

In [None]:
random_index = random.randint(0, len(augmented_dataset) - 1)
img_tensor, label = augmented_dataset[random_index]

# Function to denormalize the image tensor
def denormalize(tensor, mean, std):
    # Cloning the tensor to avoid modifying the original image
    tensor = tensor.clone()
    for t, m, s in zip(tensor, mean, std):
        t.mul_(s).add_(m)
    return tensor

# Mean and std used for normalization in the transform pipeline
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

# Denormalize the image tensor
img_denorm = denormalize(img_tensor, mean, std)

# Convert the tensor to a NumPy array and change the shape from (C, H, W) to (H, W, C)
img_np = img_denorm.numpy().transpose((1, 2, 0))
img_np = np.clip(img_np, 0, 1)  # Ensure the pixel values are in [0, 1]

# Display the image
plt.imshow(img_np)
plt.title("Augmented Image")
plt.axis("off")
plt.show()

In [None]:
# Creating the augmented dataset instance and selecting the subset
augmented_dataset = HeadCountingDataset(image_dir, pseudo_labels_path, transform=augmented_transforms)
augmented_subset = Subset(augmented_dataset, subset_indices)
augmented_loader = DataLoader(augmented_subset, batch_size=16, shuffle=False)

### Evaluation function to compute MSE and MAE

In [None]:
def evaluate_model(model, dataloader, device):
    model.eval()
    total_loss = 0.0
    total_mae = 0.0
    total_samples = 0
    criterion = nn.MSELoss()
    with torch.no_grad():
        for images, labels in dataloader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * images.size(0)
            mae = torch.abs(outputs - labels).mean().item() * images.size(0)
            total_mae += mae
            total_samples += images.size(0)
    avg_loss = total_loss / total_samples
    avg_mae = total_mae / total_samples
    return avg_loss, avg_mae

#### Evaluate both models on the augmented subset

In [None]:
resnet_mse, resnet_mae = evaluate_model(resnet_model, augmented_loader, device)
vgg_mse, vgg_mae = evaluate_model(vgg_model, augmented_loader, device)

print(f"ResNet50 on Augmented Data - MSE: {resnet_mse:.4f}, MAE: {resnet_mae:.4f}")
print(f"VGG16 on Augmented Data - MSE: {vgg_mse:.4f}, MAE: {vgg_mae:.4f}")