# Start code

In [29]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
import torchvision.models as models
from torchvision.models import ResNet50_Weights
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
from torch.optim.lr_scheduler import ReduceLROnPlateau
from PIL import Image
import json
import cv2
import numpy as np
import os
import torch.optim as optim

In [22]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(torch.cuda.is_available())

False


# Create model from scratch (Hourglass)

In [66]:
class HourglassBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(HourglassBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

    def forward(self, x):
        x1 = F.relu(self.bn1(self.conv1(x)))
        x1 = F.relu(self.bn2(self.conv2(x1)))
        x2 = self.maxpool(x1)
        return x1, x2

class Hourglass(nn.Module):
    def __init__(self, depth, in_channels, out_channels):
        super(Hourglass, self).__init__()
        self.depth = depth
        self.hg_blocks = self._make_hourglass_blocks(depth, in_channels, out_channels)
        self.upsample = nn.Upsample(scale_factor=2, mode='nearest')

    def _make_hourglass_blocks(self, depth, in_channels, out_channels):
        hg_blocks = []
        for i in range(depth):
            if i == 0:
                hg_blocks.append(HourglassBlock(in_channels, out_channels))
            else:
                hg_blocks.append(HourglassBlock(out_channels, out_channels))
        return nn.ModuleList(hg_blocks)

    def forward(self, x):
        saved_outputs = []
        for i in range(self.depth):
            x1, x = self.hg_blocks[i](x)
            saved_outputs.append(x1)
        for i in range(self.depth - 1, -1, -1):
            x = self.upsample(x)
            x = x + saved_outputs[i]
        return x

class HourglassNet(nn.Module):
    def __init__(self, num_keypoints, depth=4):
        super(HourglassNet, self).__init__()
        self.depth = depth
        self.hg = Hourglass(depth, 3, 256)
        self.conv1 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(256)
        self.conv2 = nn.Conv2d(256, num_keypoints * 2, kernel_size=1, stride=1, padding=0)  # Output channels = num_keypoints * 2

    def forward(self, x):
        x = self.hg(x)
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.conv2(x)
        x = torch.flatten(x, start_dim=1)  # Flatten the output to [batch_size, num_keypoints * 2]
        return x


# Dataset preparation and preprosseing

In [67]:
# Function to split data into training and validation sets
def split_data(image_folder, annotations_path, train_ratio=0.8):
    with open(annotations_path, 'r') as f:
        annotations = json.load(f)

    # Split data into train and validation sets
    train_annotations, val_annotations = train_test_split(annotations, train_size=train_ratio, random_state=42)

    # Save the splits into separate JSON files in a writable directory
    # writable_dir = '/kaggle/working'
    writable_dir = 'training/own_dataset_6kp/'
    train_annotations_path = os.path.join(writable_dir, 'data_train.json')
    val_annotations_path = os.path.join(writable_dir, 'data_val.json')

    with open(train_annotations_path, 'w') as f:
        json.dump(train_annotations, f, indent=4)

    with open(val_annotations_path, 'w') as f:
        json.dump(val_annotations, f, indent=4)

    print(f"Train annotations saved to: {train_annotations_path}")
    print(f"Validation annotations saved to: {val_annotations_path}")

# Dataset class for loading images and annotations
class KeypointsDataset(Dataset):
    def __init__(self, image_folder, annotations_path, transform=None):
        self.image_folder = image_folder
        self.annotations = self.load_annotations(annotations_path)
        self.transform = transform

    def load_annotations(self, annotations_path):
        with open(annotations_path, 'r') as f:
            annotations = json.load(f)
        return annotations

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        annotation = self.annotations[idx]
        img_path = os.path.join(self.image_folder, annotation['id'] + '.png')
        image = Image.open(img_path).convert("RGB")
        keypoints = annotation['kps']

        if self.transform:
            image = self.transform(image)

        sample = {'image': image, 'keypoints': torch.tensor(keypoints, dtype=torch.float32)}
        return sample

# Function to create DataLoader instances
def create_dataloaders(data_folder_path, batch_size=8, kaggle=False):
    transform = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
        transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
        transforms.ToTensor()
    ])

    if kaggle:
        train_dataset = KeypointsDataset(os.path.join(data_folder_path, 'images'), '/kaggle/working/data_train.json', transform=transform)
        val_dataset = KeypointsDataset(os.path.join(data_folder_path, 'images'), '/kaggle/working/data_val.json', transform=transforms.ToTensor())
    else:
        train_dataset = KeypointsDataset(data_folder_path, 'training/own_dataset_6kp/data_train.json', transform=transform)
        val_dataset = KeypointsDataset(data_folder_path, 'training/own_dataset_6kp/data_val.json', transform=transforms.ToTensor())


    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader

# Load data

In [68]:
image_folder = 'training/own_dataset_6kp'
annotations_path = os.path.join(image_folder, 'annotations.json')
split_data(image_folder, annotations_path)

data_folder_path = 'training/own_dataset_6kp'
batch_size = 4

train_loader, val_loader = create_dataloaders(data_folder_path, batch_size=batch_size, kaggle=False)


Train annotations saved to: training/own_dataset_6kp/data_train.json
Validation annotations saved to: training/own_dataset_6kp/data_val.json


In [69]:
# Load Hourglass Network model
num_keypoints = 6 * 2  # 6 keypoints with (x, y) coordinates
model = HourglassNet(num_keypoints=num_keypoints, depth=4)


In [70]:
# Set initial learning rate
learning_rate = 1e-4

# Define optimizer and learning rate scheduler
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = ReduceLROnPlateau(optimizer, 'min', factor=0.1, patience=5, verbose=True)

# Define loss function
criterion = torch.nn9

# Training loop with early stopping
num_epochs = 100
patience = 20
best_val_loss = float('inf')
patience_counter = 0

In [71]:

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch in train_loader:
        images = batch['image']
        keypoints = batch['keypoints'].view(batch['keypoints'].size(0), -1)  # Flatten the target keypoints

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, keypoints)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # Validation loss
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch in val_loader:
            images = batch['image']
            keypoints = batch['keypoints'].view(batch['keypoints'].size(0), -1)  # Flatten the target keypoints

            outputs = model(images)
            loss = criterion(outputs, keypoints)
            val_loss += loss.item()

    val_loss /= len(val_loader)
    running_loss /= len(train_loader)
    print(f"Epoch {epoch+1}, Train Loss: {running_loss:.4f}, Val Loss: {val_loss:.4f}")

    # Step the scheduler
    scheduler.step(val_loss)

    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save(model.state_dict(), '/kaggle/working/best_model.pth')
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping triggered")
            break

RuntimeError: The size of tensor a (1204224) must match the size of tensor b (12) at non-singleton dimension 1