<a href="https://colab.research.google.com/github/albertomariapepe/CGA-PointNet/blob/main/CGA_PoseNet_(7_Scenes).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Defining CGA-PoseNet

In [None]:
from keras.models import Model
from keras.layers import Input, Conv2D, Reshape, MaxPooling2D, AveragePooling2D, Flatten, GlobalAveragePooling2D, Dense, Dropout
import tensorflow as tf
import keras
import matplotlib.pyplot as plt

#Loading GoogLeNet with ImageNet weights

from keras.applications.inception_v3 import InceptionV3

In [None]:
!pip install tfga
from tfga import GeometricAlgebra
from tfga.layers import TensorToGeometric, GeometricProductConv1D, GeometricToTensor, GeometricSandwichProductDense, GeometricProductDense
ga = GeometricAlgebra(metric=[1, 1, 1, 1])

In [None]:
#modifying the last layer
idx = ga.get_kind_blade_indices("even")
model = InceptionV3(classifier_activation = None, weights = "imagenet", input_tensor=Input(shape=(224, 224, 3)))

x2 = Dropout(0.3)(model.layers[-2].output)
x2 = Reshape((-1, 8))(x2)
x2 = TensorToGeometric(ga, blade_indices=idx)(x2)
x2 = GeometricSandwichProductDense(
        ga, units=128, activation = "relu",
        blade_indices_kernel=idx,
        blade_indices_bias=idx)(x2)
x2 = GeometricSandwichProductDense(
        ga, units=64, activation = "relu",
        blade_indices_kernel=idx,
        blade_indices_bias=idx)(x2)
x2 = GeometricSandwichProductDense(
        ga, units=1, activation = "tanh",
        blade_indices_kernel=idx,
        blade_indices_bias=idx)(x2)
x2 = GeometricToTensor(ga, blade_indices=idx)(x2)
outputs2 = Flatten()(x2)


Model1 = tf.keras.Model(inputs=model.input, outputs=outputs2)
Model1.summary()
CGAPoseNet = Model1

# Converting Camera Poses into Motors



In [None]:
FOLDER = "chess" #Change the name to change the dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install clifford

In [None]:
from clifford.g3c import *
from math import sqrt

lambda_coeff = 10

#From Euclidean to 1D Up CGA.
#function implementing Eq. 6 (convert a vector in Euclidean space into a rotor
#in spherical space)
def translation_rotor(a, L = lambda_coeff):
    Ta = (L + a*e4)/(sqrt(L**2 + a**2))
    return Ta

#From Euclidean to 1D Up CGA. function implementing the Eq. 10 (X = f(x))
def up1D(x, L = lambda_coeff):
    Y = (2*L / (L**2 + x**2))*x + ((L**2-x**2)/(L**2 + x**2))*e4
    return Y

#From 1D Up CGA to Euclidean. function implementing the inverse of Eq. 10 (x = f^{-1}(X))
def down1D(Y, x, L = lambda_coeff):
    alpha = (2*L/(L**2 + x**2))
    beta  =  (L**2 - x**2)/(L**2 + x**2)
    x = (Y - beta*e4)/alpha

    return x

In [None]:
#reads the dataset labels and converts them into motors (Train set)

train = open("/content/drive/MyDrive/"+ FOLDER + "/TrainSplit.txt").readlines()

import os.path
import cv2
from google.colab.patches import cv2_imshow
import numpy as np
import csv

x_train = []
y_train = []

position_train = []

dir = "/content/drive/MyDrive/"+ FOLDER + "/"

with open(dir + 'TRAIN.csv', "w") as csv_file:
    fieldnames = ['filename', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']
    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
    writer.writeheader()

    #iterates over the number of video sequences
    for i in range(len(train)):


        print(i, "/", str(len(train)))

        num = train[i][8]
        files = os.listdir("/content/drive/MyDrive/"+ FOLDER + "/seq-0" + str(num) +"/")
        files.sort()


        #iterates over every frame
        for j in range(len(files)):
            if j % 100 == 0:
                print("  ", j, "/", str(len(files)))

            #print(files[j])
            if files[j].endswith('.pose.txt') and files[j].endswith('(1).pose.txt') == False:
            #print(files[j])


                with open("/content/drive/MyDrive/"+ FOLDER + "/seq-0" + str(num) + "/" + files[j] , 'r') as f:
                    l = [[float(num) for num in line.split('\t ')] for line in f]

      #converting the rotation matrix M into a rotor R
                M = np.array(l)
                B = [M[0,0]*e1 + M[1,0]*e2 + M[2,0]*e3,
                    M[0,1]*e1 + M[1,1]*e2 + M[2,1]*e3,
                    M[0,2]*e1 + M[1,2]*e2 + M[2,2]*e3]

                A = [e1,e2,e3]
                R = 1+sum([A[k]*B[k] for k in range(3)])
                R = R/abs(R)

                #mapping the position from Euclidean into spherical space
                Ta = translation_rotor(float(M[0,3])*e1 + float(M[1,3])*e2 + float(M[2,3])*e3)
                N = Ta*R


                d = {'filename': "seq-0" + str(num) + "/" + files[j].split(".")[0] + ".color.png",
                     'a':N[0],
                     'b':N[6],
                     'c':N[7],
                     'd':N[8],
                     'e':N[10],
                     'f':N[11],
                     'g':N[13],
                     'h':N[26]}
                writer.writerow(d)


#position_train = np.reshape(position_train, (-1, 3))

In [None]:
test = open("/content/drive/MyDrive/"+ FOLDER + "/TestSplit.txt").readlines()

y_test = []
position_test = []

#iterates over the number of video sequences
with open(dir + 'TEST.csv', "w") as csv_file:
    fieldnames = ['filename', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']
    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
    writer.writeheader()

    #iterates over the number of video sequences
    for i in range(len(test)):


        print(i, "/", str(len(test)))

        num = test[i][8]
        files = os.listdir("/content/drive/MyDrive/"+ FOLDER + "/seq-0" + str(num) +"/")
        files.sort()


        #iterates over every frame
        for j in range(len(files)):
            if j % 100 == 0:
                print("  ", j, "/", str(len(files)))

            #print(files[j])
            if files[j].endswith('.pose.txt') and files[j].endswith('(1).pose.txt') == False:
            #print(files[j])


                with open("/content/drive/MyDrive/"+ FOLDER + "/seq-0" + str(num) + "/" + files[j] , 'r') as f:
                    l = [[float(num) for num in line.split('\t ')] for line in f]

      #converting the rotation matrix M into a rotor R
                M = np.array(l)
                B = [M[0,0]*e1 + M[1,0]*e2 + M[2,0]*e3,
                    M[0,1]*e1 + M[1,1]*e2 + M[2,1]*e3,
                    M[0,2]*e1 + M[1,2]*e2 + M[2,2]*e3]

                A = [e1,e2,e3]
                R = 1+sum([A[k]*B[k] for k in range(3)])
                R = R/abs(R)

                #mapping the position from Euclidean into spherical space
                Ta = translation_rotor(float(M[0,3])*e1 + float(M[1,3])*e2 + float(M[2,3])*e3)
                N = Ta*R

                y_test = np.append(y_test, [N[0], N[6], N[7], N[8], N[10], N[11], N[13], N[26]])
                position_test = np.append(position_test, [M[0,3], M[1,3], M[2,3]])


                d = {'filename': "seq-0" + str(num) + "/" + files[j].split(".")[0] + ".color.png",
                     'a':N[0],
                     'b':N[6],
                     'c':N[7],
                     'd':N[8],
                     'e':N[10],
                     'f':N[11],
                     'g':N[13],
                     'h':N[26]}
                writer.writerow(d)


In [None]:
import pandas as pd

df_train =pd.read_csv(dir + '/TRAIN.csv')
train_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
        rescale=1./255,
        validation_split=0.2)

test_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
        rescale=1./255)

'''
train_generator = train_datagen.flow_from_directory(
        'drive/MyDrive/300W-LP/',
        target_size=(img_shape, img_shape),
        batch_size=batch_size,
        shuffle = True,
        class_mode=None)
'''

#df = pd.read_csv(dir + 'train_df.csv', delimiter=' ', header=None, names=['filename', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])
columns = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']
train_generator = train_datagen.flow_from_dataframe(dataframe=df_train, directory=dir,
                                              x_col="filename", y_col=columns, has_ext=True,
                                              class_mode="raw", target_size=(224, 224),
                                              shuffle = True,
                                              subset="training",
                                              sort = False,
                                              batch_size=64)

val_generator = train_datagen.flow_from_dataframe(dataframe=df_train, directory=dir,
                                              x_col="filename", y_col=columns, has_ext=True,
                                              class_mode="raw", target_size=(224, 224),
                                              shuffle = True,
                                              subset="validation",
                                              sort = False,
                                              batch_size=64)

df_test =pd.read_csv(dir + '/TEST.csv')
test_generator = test_datagen.flow_from_dataframe(dataframe=df_test, directory=dir,
                                              x_col="filename", y_col=columns, has_ext=True,
                                              class_mode="raw", target_size=(224, 224),
                                              shuffle = False,
                                              sort = False,
                                              batch_size=64)

y_test = np.reshape(y_test, (-1, 8))
position_test = np.reshape(position_test, (-1, 3))

## Training

In [None]:
#defining hyperparameters

nb_epoch = 100
batch_size = 64
initial_learning_rate = 1e-4
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=10000,
    decay_rate=0.98,
    staircase=True)

#compiling the model
CGAPoseNet.compile(optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule),
                 loss="mse", run_eagerly=True)

es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=12, restore_best_weights = True)

#training
model_train = CGAPoseNet.fit(train_generator,
                            validation_data=val_generator,
                            epochs=nb_epoch,
                            verbose=1,
                            shuffle=True,
                            callbacks = es_callback,
                            batch_size=batch_size)

#plotting losses
loss = model_train.history['loss']
val_loss = model_train.history['val_loss']
epochs = range(0,np.size(loss))

plt.figure()
plt.plot(epochs, loss, 'b-', label='Training loss')
plt.plot(epochs, val_loss, 'r-', label='Validation loss')
plt.title('Training loss')
plt.legend()
plt.show()

In [None]:
#storing losses
np.save("NB_train_loss.npy", loss)
np.save("NB_val_loss.npy", val_loss)

In [None]:
#saving weights
CGAPoseNet.save('weights.h5')
from tensorflow.python.client import device_lib
device_lib.list_local_devices()

# Testing

In [None]:
GAPoseNet = keras.models.load_model('weights.h5')

In [None]:
#prediction step
y_pred = CGAPoseNet.predict(test_generator)
#np.save("y_pred.npy", y_pred)

In [None]:
import matplotlib.pyplot as plt
#Storing the MSE between \hat{M}, M over the test set

MSE = []

tot = 0
cnt = 0
for i in range(len(y_test)):
    mse = (np.square(y_test[i] - y_pred[i])).mean()

    MSE = np.append(MSE, mse)

    #printing the first 20 motors M, \hat{M} if the MSE between them is close
    if cnt < 20 and mse < 0.0008:
        print("original:" , y_test)

        X = y_test[i]
        Y = y_pred[i]

        M_real = X[0] + X[1]*e12 + X[2]*e13 + X[3]*e14 + X[4]*e23 + X[5]*e24 + X[6]*e34 + X[7]*e1234
        M_pred = Y[0] + Y[1]*e12 + Y[2]*e13 + Y[3]*e14 + Y[4]*e23 + Y[5]*e24 + Y[6]*e34 + Y[7]*e1234

        print("prediction:", y_pred[i])
        print("****")
        cnt += 1

    tot += mse


print(tot)
np.save("MSE.npy", MSE)

In [None]:
import matplotlib.pyplot as plt

origin = e4
fig = plt.figure(figsize=(12, 12))
ax = fig.add_subplot(projection='3d')
#list_of_lines = open("/content/drive/MyDrive/"+ FOLDER + "/dataset_test.txt").readlines() # no need for closing, python will do it for you

positional_error = []
rotational_error = []

translation = []
translation_pred = []

rotation = []
rotation_pred = []

position = position_test

for i in range(len(y_test)):
    #a = []
    #a = list_of_lines[i+3].split()

    #print(a)

    #x = float(a[1])*e1 + float(a[2])*e2 + float(a[3])*e3
    x = position[i,0]*e1 + position[i,1]*e2 + position[i,2]*e3

    X = y_test[i]
    Y = y_pred[i]

    #construct M and \hat{M}
    M_real = X[0] + X[1]*e12 + X[2]*e13 + X[3]*e14 + X[4]*e23 + X[5]*e24 + X[6]*e34 + X[7]*e1234
    M_pred = Y[0] + Y[1]*e12 + Y[2]*e13 + Y[3]*e14 + Y[4]*e23 + Y[5]*e24 + Y[6]*e34 + Y[7]*e1234

    #normalizing
    M_pred = M_pred / sqrt((M_pred* ~M_pred)[0])

    #predicted and real displacement vector \hat{D}, D in spherical space
    S = M_pred * origin * ~M_pred
    T = M_real * origin * ~M_real

    #predicted and real displacement vector \hat{d}, d in Euclidean space
    s = down1D(S, x)
    t = down1D(T, x)

    #POSITIONAL ERROR
    mae = np.mean(np.abs(np.array([t[1], t[2], t[3]]) - np.array([s[1], s[2], s[3]])))

    positional_error  = np.append(positional_error , mae)

    translation = np.append(translation, np.array([t[1], t[2], t[3]]))
    translation_pred = np.append(translation_pred, np.array([s[1], s[2], s[3]]))


    #plotting the camera trace
    ax.scatter(t[1], t[2], t[3], s = 20, c = "r")
    ax.scatter(s[1], s[2], s[3], s = 20, c = "b", alpha = 0.5)

    Tup = translation_rotor(t[1]*e1 + t[2]*e2 + t[3]*e3)
    Sup = translation_rotor(s[1]*e1 + s[2]*e2 + s[3]*e3)

    #predicted and real rotors \hat{R}, R
    R_pred = ~Sup* M_pred
    R_real = ~Tup* M_real

    if (R_real * ~R_pred)[0] > 1:
        error = (np.arccos(1))*360/(2*np.pi)
    elif (R_real * ~R_pred)[0] < -1:
        error = (np.arccos(-1))*360/(2*np.pi)
    else:
        #ROTATIONAL ERROR
        error = (np.arccos((R_real * ~R_pred)[0]))*360/(2*np.pi)

    rotational_error = np.append(rotational_error, error)

    rotation = np.append(rotation, np.array([R_real[0], R_real[6], R_real[7], R_real[10]]))
    rotation_pred = np.append(rotation_pred, np.array([R_pred[0], R_pred[6], R_pred[7], R_pred[10]]))


plt.show()


#storing rotational and translational errors
np.save("translation_error.npy", positional_error)
np.save("rotational_error.npy", rotational_error)

#storing original and predicted translations
np.save("T.npy", translation)
np.save("S.npy", translation_pred)

#storing original and predicted rotations
np.save("R.npy", rotation)
np.save("Q.npy", rotation_pred)

In [None]:
import matplotlib.pyplot as plt

origin = e4
fig = plt.figure(figsize=(12, 12))
ax = fig.add_subplot(projection='3d')
#list_of_lines = open("/content/drive/MyDrive/"+ FOLDER + "/dataset_test.txt").readlines() # no need for closing, python will do it for you

positional_error = []
rotational_error = []

translation = []
translation_pred = []

rotation = []
rotation_pred = []

position = position_test

for i in range(len(y_test)):
    #a = []
    #a = list_of_lines[i+3].split()

    #print(a)

    #x = float(a[1])*e1 + float(a[2])*e2 + float(a[3])*e3
    x = position[i,0]*e1 + position[i,1]*e2 + position[i,2]*e3

    X = y_test[i]
    Y = y_pred[i]

    #construct M and \hat{M}
    M_real = X[0] + X[1]*e12 + X[2]*e13 + X[3]*e14 + X[4]*e23 + X[5]*e24 + X[6]*e34 + X[7]*e1234
    M_pred = Y[0] + Y[1]*e12 + Y[2]*e13 + Y[3]*e14 + Y[4]*e23 + Y[5]*e24 + Y[6]*e34 + Y[7]*e1234

    #normalizing
    M_pred = M_pred / sqrt((M_pred* ~M_pred)[0])

    #predicted and real displacement vector \hat{D}, D in spherical space
    S = M_pred * origin * ~M_pred
    T = M_real * origin * ~M_real

    #predicted and real displacement vector \hat{d}, d in Euclidean space
    s = down1D(S, x)
    t = down1D(T, x)

    #POSITIONAL ERROR
    mae = np.mean(np.abs(np.array([t[1], t[2], t[3]]) - np.array([s[1], s[2], s[3]])))

    positional_error  = np.append(positional_error , mae)

    translation = np.append(translation, np.array([t[1], t[2], t[3]]))
    translation_pred = np.append(translation_pred, np.array([s[1], s[2], s[3]]))


    #plotting the camera trace
    ax.scatter(t[1], t[2], t[3], s = 20, c = "r")
    ax.scatter(s[1], s[2], s[3], s = 20, c = "b", alpha = 0.5)

    Tup = translation_rotor(t[1]*e1 + t[2]*e2 + t[3]*e3)
    Sup = translation_rotor(s[1]*e1 + s[2]*e2 + s[3]*e3)

    #predicted and real rotors \hat{R}, R
    R_pred = ~Sup* M_pred
    R_real = ~Tup* M_real

    if (R_real * ~R_pred)[0] > 1:
        error = (np.arccos(1))*360/(2*np.pi)
    elif (R_real * ~R_pred)[0] < -1:
        error = (np.arccos(-1))*360/(2*np.pi)
    else:
        #ROTATIONAL ERROR
        error = (np.arccos((R_real * ~R_pred)[0]))*360/(2*np.pi)

    rotational_error = np.append(rotational_error, error)

    rotation = np.append(rotation, np.array([R_real[0], R_real[6], R_real[7], R_real[10]]))
    rotation_pred = np.append(rotation_pred, np.array([R_pred[0], R_pred[6], R_pred[7], R_pred[10]]))


plt.show()


#storing rotational and translational errors
np.save("translation_error.npy", positional_error)
np.save("rotational_error.npy", rotational_error)

#storing original and predicted translations
np.save("T.npy", translation)
np.save("S.npy", translation_pred)

#storing original and predicted rotations
np.save("R.npy", rotation)
np.save("Q.npy", rotation_pred)

In [None]:
#plotting the camera orientation (coefficients e_{12}, e_{13}, e_{23} of rotor R)
fig = plt.figure(figsize=(12, 12))
ax = fig.add_subplot(projection='3d')

N=200
stride=1

u = np.linspace(0, 2 * np.pi, N)
v = np.linspace(0, np.pi, N)
x = np.outer(np.cos(u), np.sin(v))
y = np.outer(np.sin(u), np.sin(v))
z = np.outer(np.ones(np.size(u)), np.cos(v))
ax.plot_surface(x, y, z, linewidth=0.0, alpha = 0.1, cstride=stride, rstride=stride)

ax.scatter(0, 0, 0, c = "k", marker = "s", label = "O")

rotation = np.reshape(rotation, (-1, 4))
rotation_pred = np.reshape(rotation_pred, (-1, 4))
ax.scatter(rotation[:,1], rotation[:, 2], rotation[:,3], s = 15, c = "r")
ax.scatter(rotation_pred[:,1], rotation_pred[:, 2], rotation_pred[:,3], s = 15, c = "b")
plt.show()

In [None]:
print(np.median(positional_error))
print(np.mean(  positional_error))

In [None]:
print(np.median(rotational_error))
print(np.mean(  rotational_error))