# This notebook shows how to train a robust inverse of kamera operator

- should accept vector of size 10 of 2D points in camera view (in meters)
- should output a vector of size 6 of camera pose (3D position and 3D rotation)
- should be robust to noise in input
    - it is first trained as a pure inverse of kamera operator (pretrained available in `hot_start.h5`)
    - then it is trained on a dataset of 800k samples obtained as follows:
        - generate position and rotation of camera in feasible domain
        - use kamera operator to generate 2D points
        - add noise to 2D points
        - use minimization in sense of least squares to find camera pose wit least error (Newton's method)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from kamera import kamera  # kamera operator written in Python, equivalent to the one in Matlab

In [None]:
# feasible bounds of camera position and orientation
bounds = np.array([50 * 1e-3, 50 * 1e-3, 50 * 1e-3, 20 * np.pi / 180, 20 * np.pi / 180, 20 * np.pi / 180])

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

# Neural network architecture

- Perceptron with 3 hidden layers
- activation function: Tanh

In [None]:
class Net(nn.Module):
    def __init__(self, input_size, output_size):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 512)
        self.fc3 = nn.Linear(512, 512)
        self.fc4 = nn.Linear(512, 512)
        self.fc8 = nn.Linear(512, output_size)
        self.relu = nn.Tanh()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.relu(self.fc3(x))
        x = self.relu(self.fc4(x))
        x = self.fc8(x)
        return x

model = Net(10, 6)

### Optional: load pretrained model
- pretrained model is available in `hot_start.h5` file
- its trained for around 24hours on 4070Rtx GPU
- it is trained on synthetic data generated from feasible domain of camera pose and 2D points obtained from kamera operator

In [None]:
import h5py

# Load the model weights from HDF5 file
with h5py.File('hot_start.h5', 'r') as h5file:
    for name, param in model.named_parameters():
        # Ensure the parameter name matches the HDF5 dataset name structure
        param.data.copy_(torch.from_numpy(h5file[name][...]))

## Move model to GPU (if available)


In [None]:
model.to(device)

# Training on synthetic data as precise inverse of kamera operator

Training in batches of `n` samples:
- generate random feasible camera pose and corresponding 2D points using kamera operator
- split to train and test set
- using Adam optimizer (you can adjust learning rate)
- there is fixed number of epochs with some checks for early stopping (overfitting, and patience)

In [None]:
for trial_sets in range(100000):
    print(f"Trial set {trial_sets}:")
    
    n = int(1e4)  # number of samples
    output_data = bounds * (np.random.rand(n, 6) - 0.5) * 2  # generate random camera positions and orientations
    input_data = kamera(output_data)  # calculate the corresponding points positions in the image
    
    # divide the data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(input_data, output_data, test_size=0.1, random_state=42)

    # Convert data to PyTorch tensors and send to the device (GPU if available)
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
    y_test_tensor = torch.tensor(y_test, dtype=torch.float32).to(device)

    # Define the loss function and the optimizer
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.00001)  # Decrease the learning rate

    # Add a learning rate scheduler
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=100, factor=0.5)

    # Train the model without using batches
    num_epochs = 10000
    best_loss = np.inf
    patience, trials = 2000, 0

    for epoch in range(num_epochs):
        # Forward pass
        outputs = model(X_train_tensor)
        loss = criterion(outputs, y_train_tensor)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Adjust the learning rate based on the loss
        scheduler.step(loss)

        # Print the loss every 100 epochs
        if epoch % 100 == 0:
            outputs = model(X_test_tensor)
            loss2 = criterion(outputs, y_test_tensor)
            print('Epoch [{}/{}], Loss: {:.10f}, test Loss reached {:.10f}, min Loss reached {:.10f}, stagnating for {} it.'.format(epoch +
                1, num_epochs, loss.item(),loss2.item(), best_loss, trials))
            if loss2.item() > 10*best_loss:
                print(f"Overfitting detected at epoch {epoch+1}")
                break

        # Early stopping
        if loss.item() < best_loss:
            best_loss = loss.item()
            trials = 0
        else:
            trials += 1
            if trials >= patience:
                print(f'Stopping early at epoch {epoch+1}')
                break

In [None]:
model.to(device)

# Training on noisy data

## Load noisy data (and their optimal camera pose as Least squares solution)

In [None]:
# load matlab saved file noisy_data.mat and print what variables are in it
import scipy.io
mat = scipy.io.loadmat('noisy_data.mat')

# create torch dataloader on the noisy data its synth_x_best as the output and synth_y_test as the input
input_data_noisy = torch.tensor(mat['synth_y_test'], dtype=torch.float32)
output_data_noisy = torch.tensor(mat['synth_x_best'], dtype=torch.float32)
dataloader = DataLoader(TensorDataset(input_data_noisy, output_data_noisy), batch_size=10000, shuffle=True)

## Training on batches of `n` samples of noisy data

- recomended to use smaller number of epochs and repeat whole proces multiple times with adjusted learning rate

In [None]:
for batch_input, batch_output in dataloader:

    X_train, X_test, y_train, y_test = train_test_split(batch_input, batch_output, test_size=0.1, random_state=42)

    # Convert data to PyTorch tensors and send to the device (GPU if available)
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
    y_test_tensor = torch.tensor(y_test, dtype=torch.float32).to(device)

    # Define the loss function and the optimizer
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.00001)  # Decrease the learning rate

    # Add a learning rate scheduler
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=100, factor=0.5)

    # Train the model without using batches
    num_epochs = 100000
    best_loss = np.inf
    patience, trials = 5000, 0

    for epoch in range(num_epochs):
        # Ensure the entire dataset is on the same device as the model

        # Forward pass
        outputs = model(X_train_tensor)
        loss = criterion(outputs, y_train_tensor)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Adjust the learning rate based on the loss
        scheduler.step(loss)

        if epoch % 100 == 0:
            outputs = model(X_test_tensor)
            loss2 = criterion(outputs, y_test_tensor)
            print('Epoch [{}/{}], Loss: {:.10f}, test Loss reached {:.10f}, min Loss reached {:.10f}, stagnating for {} it.'.format(epoch +
                1, num_epochs, loss.item(),loss2.item(), best_loss, trials))
            if loss2.item() > 10*best_loss:
                print(f"Overfitting detected at epoch {epoch+1}")
                break

        # Early stopping
        if loss.item() < best_loss:
            best_loss = loss.item()
            trials = 0
        else:
            trials += 1
            if trials >= patience:
                print(f'Stopping early at epoch {epoch+1}')
                break

# Save the trained model to h5 file

In [None]:
import h5py

# If your model is on CUDA, move it back to CPU
model.to('cpu')

# Save model parameters to HDF5, including biases
with h5py.File('trained_model_weights.h5', 'w') as h5file:
    for name, param in model.state_dict().items():
        h5file.create_dataset(name, data=param.numpy())