# Almost Visual Inertial Odometry

# Imports

In [66]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader
from tqdm import tqdm
from  matplotlib import pyplot as plt
from matplotlib.image import imread
import pandas as pd
import numpy as np

# Constants

In [2]:
SEED = 20197
BATCH_SIZE = 16
TEST_BATCH_SIZE = 32
GAMMA = 0.1   # torch default
LR=0.001 
EPOCHS = 50

# Device

In [14]:
available_cuda = torch.cuda.is_available()
print(f"Available cuda: {available_cuda}")

device = torch.device("cuda" if available_cuda else "cpu")

Available cuda: False


# Dataset

In [None]:
class AdvioDataset(torch.utils.data.Dataset):
      def __init__(self, path_frames, path_frames_folder, path_inertials, path_labels):
            self.path_frames_folder = path_frames_folder
            self.path_labels = path_labels
            self.path_inertials = path_inertials
            self.df_frames = pd.read_csv(path_frames)

      def __len__(self):
            return len(self.df_frames) - 1

      def __getitem__(self, index):

            # load sample of frames
            image_name = self.path_frames_folder + self.df_frames.iloc[index, 1]
            frame = imread(image_name)
            frame = torch.Tensor(frame)

            image_name_next = self.path_frames_folder + self.df_frames.iloc[index+1, 1]
            frame_next = imread(image_name_next)
            frame_next = torch.Tensor(frame_next)

            sample_frame = frame_next - frame

            # TODO build buffer sample of inertials
            sample_inertials = ""

            # load labels
            label_odometry = np.load(self.path_labels + f"{self.df_frames.iloc[index, 0]}.npy")
            label_odometry = torch.Tensor(label_odometry)

            # load inertials
            label_inertial = np.load(self.path_inertials + f"{self.df_frames.iloc[index, 0]}.npy")
            label_inertial = torch.Tensor(label_inertial)

            return sample_frame, label_odometry, label_inertial


path_frames = "./data/advio-01/iphone/frames_synced.csv"
path_frames_folder = "./data/advio-01/iphone/frames/"
path_inertials = "./data/advio-01/iphone/inertials/"
path_labels = "./data/advio-01/iphone/labels/"

train_data = AdvioDataset(path_frames, path_frames_folder, path_inertials, path_labels)

params = {'batch_size': 32,
          'shuffle': True,
          'num_workers': 3}
train_gen = torch.utils.data.DataLoader(train_data, **params)

# Model

In [None]:
class Net(nn.Module):
    def __init__(self):
        pass

    def forward(self, x):
        pass

# Training

In [None]:
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    
    for batch_idx, (data, target) in enumerate(dataset):
        # data preparation
        data, target = data.to(device), target.to(device)

        #forward
        optimizer.zero_grad()
        output = model(data)

        # loss
        loss = torch.sqrt(F.mse_loss(output, data))

        # backward
        loss.backward()
        optimizer.step()

        #stats
        history_loss += loss.item()

    return history_loss/(len(train_loader.dataset)/BATCH_SIZE) # average epoch loss


In [None]:
def test(model, device, train_loader, optimizer, epoch):
    model.train()
    
    for batch_idx, (data, target) in enumerate(dataset):
        # data preparation
        data, target = data.to(device), target.to(device)

        # forward
        optimizer.zero_grad()
        output = model(data)

        # loss
        loss = torch.sqrt(F.mse_loss(output, data))

        #stats
        history_loss += loss.item()

    return history_loss/(len(train_loader.dataset)/BATCH_SIZE) # average epoch loss


In [None]:
model = Net().to(device)
optimizer = optim.Adam(model.parameters(), lr=LR)

h_loss_train = []
h_loss_test = []

for epoch in range(1, EPOCHS + 1):
    print(f"Epoch {epoch}/{EPOCHS}")

    loss_train = train(model, device, train_loader, optimizer, epoch)
    loss_test, data, output, embedded_code = test(model, device, test_loader)

    #populating the history of the loss
    h_loss_train.append(loss_train)
    h_loss_test.append(loss_test)

# Plotting