# Testing the Individual Models

## Data Loading

In [None]:
import torch
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision import transforms
from PIL import Image
import os
import pandas as pd

Load data into a torch dataset

In [None]:
# Create an association between the timestamps and the corresponding images
def prep_combined_csv(input_dir, output_filepath):
    # Read the CSV files
    cam0_path = os.path.join(input_dir, 'cam0')
    cam1_path = os.path.join(input_dir, 'cam1')
    cam0_csv_mappings = pd.read_csv(os.path.join(cam0_path, 'data.csv'))
    cam1_csv_mappings = pd.read_csv(os.path.join(cam1_path, 'data.csv'))
    imu0_path = os.path.join(input_dir, 'imu0')
    imu_readings = pd.read_csv(os.path.join(imu0_path, 'data.csv')
    
    # Merge the IMU readings with the camera data
    combined_df = pd.merge(cam0_csv_mappings, cam1_csv_mappings, on='#timestamp [ns]', how='outer', suffixes=('_cam0', '_cam1'))
    combined_df = pd.merge(combined_df, imu_readings, on='#timestamp [ns]', how='outer')
    combined_df['filename_cam0'] = combined_df['filename_cam0'].fillna(method='ffill')
    combined_df['filename_cam1'] = combined_df['filename_cam1'].fillna(method='ffill')
    combined_df.columns = ['timestamp', 'cam0_filename', 'cam1_filename', 'w_x', 'w_y', 'w_z', 'a_x', 'a_y', 'a_z']

    # Save the combined dataframe to a new CSV file
    combined_df.to_csv(output_filepath, index=False)

class IMUImageDataset(Dataset):
    def __init__(self, csv_path, cam0_image_root, cam1_image_root, transform=None):
        self.data = pd.read_csv(csv_path)
        self.cam0_image_root = cam0_image_root
        self.cam1_image_root = cam1_image_root
        self.transform = transform or transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor()
        ])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]

        # Load image
        cam0_path = os.path.join(self.cam0_image_root, row['cam0_filename'])
        cam0_image = Image.open(cam0_path).convert("RGB")
        cam0_image = self.transform(cam0_image)

        cam1_path = os.path.join(self.cam1_image_root, row['cam1_filename'])
        cam1_image = Image.open(cam1_path).convert("RGB")
        cam1_image = self.transform(cam1_image)

        # Load IMU features
        imu = row[["w_x", "w_y", "w_z", "a_x", "a_y", "a_z"]].values.astype("float32")
        imu_tensor = torch.tensor(imu)

        return imu_tensor, cam0_image, cam1_image

In [None]:
from torch.utils.data import random_split

csv_path = None
cam0_image_root = None
cam1_image_root = None
transform = None

# imu_cam_dataset = IMUImageDataset(csv_path, cam0_image_root, cam1_image_root, transform=transform)
# dataloader = DataLoader(imu_cam_dataset, batch_size=64, shuffle=False)

# train_set, test_set = random_split()

# Using CIFAR10 as an example
# Load the training dataset
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=4, shuffle=True, num_workers=2)

# Load the testing dataset
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=4, shuffle=False, num_workers=2)

## ImageEncoder

In [2]:
from models.ImageEncoder import ImageEncoder

In [None]:
# Parameters
out_dim = 128
ch_in = 3

In [3]:
image_encoder = ImageEncoder(ch_in=ch_in, out_dim=out_dim)


num_epochs = 10

for epoch in range(num_epochs):
    # Set the model to training mode
    image_encoder.train()
    running_loss = 0.0

    # Iterate over batches in the data loader
    for batch_idx, (inputs, targets) in enumerate(dataloader):
        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = image_encoder(inputs)

        # Calculate the loss
        loss = loss_function(outputs, targets)

        # Backward pass
        loss.backward()

        # Update the weights
        optimizer.step()

        running_loss += loss.item()

    # Print average loss for the epoch
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(dataloader)}')

print('Training finished.')

models.ImageEncoder.ImageEncoder