# I. Camera localization - PoseNet Training

## 1. Import depencies

In [4]:
from google.colab import drive
#drive.mount('/content/drive',force_mount=True)
%cd '/content/drive/MyDrive/ANAIS2023-PoseNet-Lab'
from pose_resnet import *
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
import numpy as np
import requests
!pip install tqdm
from tqdm import tqdm
from torchvision import models
import torch.nn.functional as F
import matplotlib.pyplot as plt

[Errno 2] No such file or directory: '/content/drive/MyDrive/ANAIS2023-PoseNet-Lab'
/content


ModuleNotFoundError: ignored

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## 2. Download dataset

In [None]:
# Specify the URL of the file to download
url = "https://api.repository.cam.ac.uk/server/api/core/bitstreams/1cd2b04b-ada9-4841-8023-8207f1f3519b/content"

# Specify the destination file path
file_path = 'KingsCollege.zip'

# Check if the file already exists
if not os.path.exists(file_path):
    # Download the file
    response = requests.get(url, stream=True)
    file_size = int(response.headers.get('Content-Length', 0))
    chunk_size = 1024
    num_bars = int(file_size / chunk_size)
    
    with open(file_path, 'wb') as f:
        for chunk in tqdm(response.iter_content(chunk_size=chunk_size), total=num_bars, unit='KB', desc='Downloading File', ascii=True, ncols=75):
            f.write(chunk)
    !unzip /content/KingsCollege.zip -d /content/
else:
    print(f"File '{file_path}' already exists. Skipping download.")

# Extract the zip file
#!unzip /content/KingsCollege.zip -d /content/

File 'KingsCollege.zip' already exists. Skipping download.


## 3. DataLoader

In [None]:
class PoseDataset(Dataset):
    def __init__(self, root_dir, file_path, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.data = []
        with open(file_path, 'r') as f:
            lines = f.readlines()[3:]  # skip the header
            for line in lines:
                items = line.split()
                image_path = os.path.join(root_dir, items[0])
                image = Image.open(image_path)
                if self.transform:
                    image = self.transform(image)
                pose = list(map(float, items[1:]))
                self.data.append((image, torch.tensor(pose)))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image, pose = self.data[idx]
        return image, pose

# Define the transformations
transform = transforms.Compose([
    transforms.Resize(260),
    transforms.CenterCrop(250),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

## 4. Define loss function

In [None]:
# Define the loss function
class BalancedMSELoss(nn.Module):
    def __init__(self):
        super(BalancedMSELoss, self).__init__()
        self.mse = nn.MSELoss()

    def forward(self, pred, target):
        trans_pred, rot_pred = pred[:, :3], pred[:, 3:]
        trans_target, rot_target = target[:, :3], target[:, 3:]
        trans_loss = self.mse(trans_pred, trans_target)
        rot_loss = self.mse(rot_pred, rot_target)
        loss = trans_loss + 500*rot_loss
        return loss

## 5. Training

In [None]:
def train(model, train_loader, device, criterion, optimizer):
    model.train()

    total_loss = 0
    total_images = 0
    for i, (images, poses) in enumerate(train_loader):
        images = images.to(device)
        poses = poses.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, poses)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * images.size(0)
        total_images += images.size(0)

    # Return the average loss for this epoch
    avg_loss = total_loss / total_images
    print(f'Average Loss this epoch: {avg_loss}')
    return avg_loss

## 6. Testing

In [None]:
def test(model, test_loader, device, criterion):
    model.eval()
    losses = []
    with torch.no_grad():
        for images, poses in test_loader:
            images = images.to(device)
            poses = poses.to(device)
            outputs = model(images)
            loss = criterion(outputs, poses)
            losses.append(loss.item() * images.size(0))

    # Return the average loss and errors for this test run
    avg_loss = sum(losses) / len(losses)

    print(f'Average MSE on the test set: {avg_loss}')
    return avg_loss

## 7.Main

In [None]:
def main():
    # Hyperparameters
    num_epochs = 1000
    test_interval = 5  # Run test every 'test_interval' epochs
    learning_rate = 0.005

    # Device configuration
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Create the model
    model = PoseResNet().to(device)

    # Define the loss and the optimizer
    criterion = BalancedMSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Load the data
    root_dir = "./KingsCollege/"
    train_dataset = PoseDataset(root_dir, root_dir + "dataset_train.txt", transform=transform)
    test_dataset = PoseDataset(root_dir, root_dir + "dataset_test.txt", transform=transform)
    train_loader = DataLoader(dataset=train_dataset, batch_size=128, shuffle=True, num_workers=8)
    test_loader = DataLoader(dataset=test_dataset, batch_size=128, shuffle=False, num_workers=8)

    # Create the linear learning rate scheduler
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=60, gamma=0.1)

    # Lists to keep track of loss
    train_losses = []
    test_losses = []

    for epoch in range(num_epochs):
        train_loss = train(model, train_loader, device, criterion, optimizer)
        scheduler.step()  
        train_losses.append(train_loss)

        # Test the model every 'test_interval' epochs
        if (epoch % test_interval == 0 or epoch == num_epochs - 1) and epoch != 0:
            test_loss = test(model, test_loader, device, criterion)
            test_losses.append(test_loss)

            # Plot the losses
            fig2 = plt.figure()  # Create a new figure for loss plot
            plt.plot(train_losses, label='Train loss')
            plt.plot(range(test_interval, len(train_losses)+1, test_interval), test_losses, label='Test loss')
            plt.title('Train and Test Loss')
            plt.xlabel('Epochs')
            plt.ylabel('Loss')
            plt.legend()
            plt.show()  # Show the loss plot


if __name__ == "__main__":
    main()

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 81.2MB/s]
