# Load data

In [5]:
import numpy as np
from PIL import Image
from tqdm import tqdm


# Helper function to parse the tripod sequence file
def parse_tripod_seq_file(file_path):
    with open(file_path, 'r') as file:
        lines = file.read().splitlines()
        image_dims = list(map(int, lines[0].split()))
        num_frames = list(map(int, lines[1].split()))
        frames_360 = list(map(int, lines[4].split()))
        frontal_frames = list(map(int, lines[5].split()))
        rotation_sense = list(map(int, lines[6].split()))
    return image_dims, num_frames, frames_360, frontal_frames, rotation_sense


# Function to load and resize an image using PIL
def load_and_resize_image(filename, img_height, img_width):
    # Open the image file
    img = Image.open(filename)
    # Resize the image
    img = img.resize((img_width, img_height))
    # Convert the image to a numpy array
    img_array = np.array(img)
    return img_array


# Function to load and preprocess image and bbox data
def load_and_preprocess_data(base_path, sequence_ids, img_width, img_height,
                             frames_per_seq, frames_360, frontal_frames, rotation_sense):
    data = []
    labels = []
    bboxes = []

    for i, seq_id in enumerate(tqdm(sequence_ids, desc='Loading sequences')):
        num_frames = frames_per_seq[i]
        num_frames_360 = frames_360[i]
        frontal_frame = frontal_frames[i]
        sense = rotation_sense[i]
        bbox_path = f"{base_path}/bbox_{seq_id:02d}.txt"
        bbox_data = np.loadtxt(bbox_path, delimiter=' ')

        # for frame_id in tqdm(range(1, num_frames + 1), desc=f'Processing seq {seq_id}', leave=False):
        for frame_id in range(1, num_frames + 1):
            filename = f"{base_path}/tripod_seq_{seq_id:02d}_{frame_id:03d}.jpg"

            img = load_and_resize_image(filename, img_height, img_width)

            # img /= 255.0  # Normalize to [0, 1]

            relative_position = (frame_id - frontal_frame) % num_frames_360
            rotation_angle = relative_position * (360 / num_frames_360) * sense

            data.append(img)
            labels.append(rotation_angle)
            bboxes.append(bbox_data[frame_id - 1])  # Add bbox data

    return np.array(data), np.array(labels), np.array(bboxes)


def load_data():
    file_path = r'./data/epfl-gims08/tripod-seq/tripod-seq.txt'
    base_path = r'./data/epfl-gims08/tripod-seq'
    train_sequence_ids = list(range(1, 11))  # Sequences 1-10 for training
    test_sequence_ids = list(range(11, 21))  # Sequences 11-20 for testing

    image_dims, num_frames, frames_360, frontal_frames, rotation_sense = parse_tripod_seq_file(file_path)
    img_width, img_height = image_dims[1], image_dims[2]

    # Load data
    train_images, train_labels, train_bboxes = load_and_preprocess_data(
        base_path, train_sequence_ids, img_width, img_height, num_frames[:10], frames_360[:10], frontal_frames[:10],
        rotation_sense[:10])
    test_images, test_labels, test_bboxes = load_and_preprocess_data(
        base_path, test_sequence_ids, img_width, img_height, num_frames[10:], frames_360[10:], frontal_frames[10:],
        rotation_sense[10:])

    return train_images, train_labels, train_bboxes, test_images, test_labels, test_bboxes


# Train

In [6]:
import argparse
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from tqdm import tqdm
import torchvision.models as models
from torchvision import transforms

import load_data


class VehicleDataset(Dataset):
    def __init__(self, images, labels, bboxes):
        self.images = images
        self.labels = labels
        self.bboxes = bboxes
        self.transforms = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = Image.fromarray(self.images[idx])
        label = self.labels[idx]
        bbox = self.bboxes[idx]
        cropped_image = image.crop((bbox[0], bbox[1], bbox[0] + bbox[2], bbox[1] + bbox[3]))
        image_tensor = self.transforms(cropped_image)
        return image_tensor, torch.tensor([label])


class AnglePredictor(nn.Module):
    def __init__(self):
        super(AnglePredictor, self).__init__()
        self.resnet_model = models.resnet50(pretrained=True)
        self.resnet_model = nn.Sequential(*list(self.resnet_model.children())[:-1])
        self.regression_head = nn.Linear(2048, 1)

    def forward(self, pixel_values):
        features = self.resnet_model(pixel_values)
        features = features.view(features.size(0), -1)
        angle = self.regression_head(features)
        return angle


def get_data_loaders(train_images, train_labels, train_bboxes, test_images, test_labels, test_bboxes, batch_size=4):
    train_dataset = VehicleDataset(train_images, train_labels, train_bboxes)
    test_dataset = VehicleDataset(test_images, test_labels, test_bboxes)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, test_loader


def evaluate(model, test_loader, device):
    model.eval()
    criterion = nn.L1Loss()
    total_loss = 0.0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels.float())
            total_loss += loss.item()
    average_loss = total_loss / len(test_loader)
    print(f"Test Loss: {average_loss}")
    return average_loss


def train_model(model, train_loader, test_loader, num_epochs=10):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    for epoch in tqdm(range(num_epochs)):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f"Epoch {epoch + 1}, Loss: {running_loss / len(train_loader)}")
        evaluate(model, test_loader, device)


def main():
    # parser = argparse.ArgumentParser()
    # parser.add_argument("--bz", default=4, type=int, help="batch size")
    # parser.add_argument("--epoch", default=100, type=int)
    #
    # args = parser.parse_args()
    # print(args)

    bz = 4
    epoch = 100

    train_images, train_labels, train_bboxes, test_images, test_labels, test_bboxes = load_data.load_data()
    train_loader, test_loader = get_data_loaders(
        train_images, train_labels, train_bboxes, test_images, test_labels, test_bboxes, bz,
    )

    model = AnglePredictor()
    train_model(model, train_loader, test_loader, num_epochs=epoch)

# Main

In [7]:
main()

Loading sequences: 100%|██████████| 10/10 [00:01<00:00,  5.12it/s]
Loading sequences: 100%|██████████| 10/10 [00:03<00:00,  2.70it/s]
  0%|          | 0/100 [00:01<?, ?it/s]


KeyboardInterrupt: 