In [9]:
import os
import cv2
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, mean_squared_error, r2_score

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

# ===============================================================
# Data preparation
# ===============================================================

def load_data(file_path):
    images_folder = os.path.join(file_path, 'image_data')
    labels_path = os.path.join(file_path, 'train.csv')

    if not os.path.exists(images_folder):
        raise FileNotFoundError(f"Image folder not found: {images_folder}")
    if not os.path.exists(labels_path):
        raise FileNotFoundError(f"train.csv not found: {labels_path}")

    labels_df = pd.read_csv(labels_path)

    required_cols = {'Name', 'HeadCount'}
    if not required_cols.issubset(labels_df.columns):
        raise ValueError(f"train.csv must contain columns: {required_cols}. Found: {list(labels_df.columns)}")

    labels_dict = dict(zip(labels_df['Name'].astype(str), labels_df['HeadCount']))

    available_files = set(os.listdir(images_folder))
    valid_dict = {name: label for name, label in labels_dict.items() if name in available_files}

    missing = len(labels_dict) - len(valid_dict)
    print(f"[LOADING] Found {len(valid_dict)} labeled images ({missing} missing from folder).")

    return images_folder, valid_dict


def preprocess_image(path, size=(64,64)):
    img = cv2.imread(path)
    if img is None:
        raise ValueError(f"Could not read image: {path}")
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    gray_resized = cv2.resize(gray, size)
    return gray_resized.astype(np.float32) / 255.0

class ImageDataset(Dataset):
    def __init__(self, image_folder, labels_dict, size=(64,64)):
        self.samples = []
        self.labels = []
        self.size = size
        for img_name, label in labels_dict.items():
            img_path = os.path.join(image_folder, img_name)
            if os.path.exists(img_path):
                self.samples.append(img_path)
                self.labels.append(label)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img = preprocess_image(self.samples[idx], size=self.size)
        img_tensor = torch.tensor(img).unsqueeze(0)  # (1, H, W)
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        return img_tensor, label

# ===============================================================
# Model definitions
# ===============================================================

# --- Baseline: Predict mean value ---
class Baseline(nn.Module):
    def __init__(self, mean_value):
        super().__init__()
        self.mean_value = nn.Parameter(torch.tensor(mean_value, dtype=torch.float32), requires_grad=False)

    def forward(self, x):
        batch_size = x.size(0)
        return self.mean_value.repeat(batch_size, 1) if self.mean_value.ndim == 1 else self.mean_value.repeat(batch_size)

# --- Simple fully connected NN ---
class SimpleNN(nn.Module):
    def __init__(self, input_size, output_size, activation='relu'):
        super().__init__()
        act = {'relu': nn.ReLU(), 'tanh': nn.Tanh(), 'sigmoid': nn.Sigmoid()}.get(activation, nn.ReLU())
        self.layers = nn.Sequential(
            nn.Flatten(),
            nn.Linear(input_size, 256),
            act,
            nn.Linear(256, 128),
            act,
            nn.Linear(128, output_size)
        )

    def forward(self, x):
        return self.layers(x)

# --- CNN-based NN ---
class SimpleCNN(nn.Module):
    def __init__(self, output_size):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.fc = nn.Sequential(
            nn.Linear(32 * 16 * 16, 128),
            nn.ReLU(),
            nn.Linear(128, output_size)
        )

    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

# --- RNN-based NN ---
class SimpleRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Treat each row as a timestep
        B, C, H, W = x.shape
        x = x.view(B, H, W)  # remove channel dim
        out, _ = self.rnn(x)
        out = out[:, -1, :]  # take last timestep
        return self.fc(out)

# --- LSTM-based NN ---
class SimpleLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        B, C, H, W = x.shape
        x = x.view(B, H, W)
        out, _ = self.lstm(x)
        out = out[:, -1, :]
        return self.fc(out)

# ===============================================================
# Training utilities
# ===============================================================

def train_model(model, train_loader, val_loader, epochs, task="regression", lr=1e-3):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    criterion = nn.MSELoss() if task == "regression" else nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for X, y in train_loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            outputs = model(X)

            if task == "classification":
                y = y.long()
            loss = criterion(outputs.squeeze(), y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        val_loss = 0
        model.eval()
        with torch.no_grad():
            for X, y in val_loader:
                X, y = X.to(device), y.to(device)
                outputs = model(X)
                if task == "classification":
                    y = y.long()
                loss = criterion(outputs.squeeze(), y)
                val_loss += loss.item()

        print(f"Epoch [{epoch+1}/{epochs}] Train Loss: {train_loss/len(train_loader):.4f} | Val Loss: {val_loss/len(val_loader):.4f}")
        
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for X, y in val_loader:
            X, y = X.to(device), y.to(device)
            outputs = model(X)

            if task == "classification":
                preds = torch.argmax(outputs, dim=1).cpu().numpy()
                labels = y.cpu().numpy()
            else:
                preds = outputs.squeeze().cpu().numpy()
                labels = y.cpu().numpy()

            all_preds.extend(preds)
            all_labels.extend(labels)

    if task == "classification":
        acc = accuracy_score(all_labels, all_preds)
        print(f"\n[RESULTS] Final Accuracy: {acc*100:.2f}%")
    else:
        mse = mean_squared_error(all_labels, all_preds)
        r2 = r2_score(all_labels, all_preds)
        print(f"\n[RESULTS] Final MSE: {mse:.4f} | R²: {r2:.4f}")

    return model

def evaluate_baseline(model, val_loader, task="regression"):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    all_preds, all_labels = [], []

    with torch.no_grad():
        for X, y in val_loader:
            X, y = X.to(device), y.to(device)
            outputs = model(X)

            if task == "classification":
                preds = torch.argmax(outputs, dim=1).cpu().numpy()
                labels = y.cpu().numpy()
            else:
                preds = outputs.squeeze().cpu().numpy()
                labels = y.cpu().numpy()

            all_preds.extend(preds)
            all_labels.extend(labels)

    if task == "classification":
        acc = accuracy_score(all_labels, all_preds)
        print(f"[BASELINE RESULTS] Accuracy: {acc*100:.2f}%")
    else:
        mse = mean_squared_error(all_labels, all_preds)
        r2 = r2_score(all_labels, all_preds)
        print(f"[BASELINE RESULTS] MSE: {mse:.4f} | R²: {r2:.4f}")


In [10]:
# --------------------------------------------------------------------------------------------------
# Params
# --------------------------------------------------------------------------------------------------

EPOCH_NUM = 5
TASK_TYPE = "regression"
OUTPUT_SIZE = 1  # regression (HeadCount)

# --------------------------------------------------------------------------------------------------

data_folder = '../project_data/train/'
images_folder, labels_dict = load_data(data_folder)

train_dict, val_dict = train_test_split(list(labels_dict.items()), test_size=0.2, random_state=42)
train_dict = dict(train_dict)
val_dict = dict(val_dict)

train_dataset = ImageDataset(images_folder, train_dict)
val_dataset = ImageDataset(images_folder, val_dict)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

example_img, _ = train_dataset[0]
input_size = example_img.numel()

models = {
    "Baseline": Baseline(np.mean(list(labels_dict.values()))),
    "LinearNN": SimpleNN(input_size, OUTPUT_SIZE, activation='relu'),
    "CNN": SimpleCNN(OUTPUT_SIZE),
    "RNN": SimpleRNN(input_size=example_img.shape[-1], hidden_size=64, output_size=OUTPUT_SIZE),
    "LSTM": SimpleLSTM(input_size=example_img.shape[-1], hidden_size=64, output_size=OUTPUT_SIZE)
}

for name, model in models.items():
    print(f"\n=== Training {name} ===")
    if isinstance(model, Baseline):
        print("Skipping training for Baseline (no learnable parameters).")
        evaluate_baseline(model, val_loader, task=TASK_TYPE)
        continue
    trained = train_model(model, train_loader, val_loader, epochs=EPOCH_NUM, task=TASK_TYPE)


[LOADING] Found 5733 labeled images (0 missing from folder).

=== Training Baseline ===
Skipping training for Baseline (no learnable parameters).
[BASELINE RESULTS] MSE: 11.0285 | R²: -0.0001

=== Training LinearNN ===
Epoch [1/5] Train Loss: 8.4724 | Val Loss: 12.8941
Epoch [2/5] Train Loss: 7.6904 | Val Loss: 13.3057
Epoch [3/5] Train Loss: 7.3879 | Val Loss: 12.5435
Epoch [4/5] Train Loss: 7.0230 | Val Loss: 12.5276
Epoch [5/5] Train Loss: 6.8894 | Val Loss: 12.1179

[RESULTS] Final MSE: 12.1209 | R²: -0.0992

=== Training CNN ===
Epoch [1/5] Train Loss: 5.6839 | Val Loss: 9.8302
Epoch [2/5] Train Loss: 5.2667 | Val Loss: 10.0289
Epoch [3/5] Train Loss: 5.1188 | Val Loss: 10.1389
Epoch [4/5] Train Loss: 5.0691 | Val Loss: 10.0264
Epoch [5/5] Train Loss: 4.5262 | Val Loss: 9.9302

[RESULTS] Final MSE: 9.9442 | R²: 0.0982

=== Training RNN ===
Epoch [1/5] Train Loss: 6.9622 | Val Loss: 11.0502
Epoch [2/5] Train Loss: 5.9289 | Val Loss: 10.9477
Epoch [3/5] Train Loss: 5.8815 | Val Loss

In [None]:
#cnn seems best (~10% improvement instantly), still not good (+/-3 heads detected), let's tweak it a bit
# heavy todo here for me
class ImprovedCNN(nn.Module):
    def __init__(self, num_outputs=1, task="regression", input_size=(128, 128)):
        super().__init__()
        self.task = task
        
        self.conv_layers = nn.Sequential(
            nn.Conv2d(1, 16, 3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(16, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(32, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )

        with torch.no_grad():
            dummy = torch.zeros(1, 1, *input_size)
            dummy_out = self.conv_layers(dummy)
            flattened_dim = dummy_out.view(1, -1).shape[1]

        self.fc_layers = nn.Sequential(
            nn.Linear(flattened_dim, 256),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(256, 64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64, num_outputs)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        if self.task == "classification":
            return F.log_softmax(x, dim=1)
        return x

cnn_model = ImprovedCNN(num_outputs=1, task=TASK_TYPE, input_size=(128, 128))


[INFO] Flattened feature size: 16384


RuntimeError: mat1 and mat2 shapes cannot be multiplied (32x4096 and 16384x256)

In [13]:
cnn_model = ImprovedCNN(num_outputs=1, task="regression", input_size=(128, 128))


[INFO] Flattened feature size: 16384
