In [213]:
import numpy as np
import pandas as pd
import math as m
from einops import rearrange
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
import math
from scipy.special import sph_harm


### Preperation (NOTE: Using meter as unit)

In [214]:
# convert pointcloud from cartisean coordinate to spherical coordinate
def cart2sph(xyz):
    x = xyz[:,0]
    y = xyz[:,1]
    z = xyz[:,2]
    XsqPlusYsq = x**2 + y**2
    r = np.sqrt(list(XsqPlusYsq + z**2))
    elev = np.arctan2(list(z), np.sqrt(list(XsqPlusYsq)))
    pan = np.arctan2(list(x), list(y))
    output = np.array([r, elev, pan])
    return rearrange(output, 'a b -> b a') 

In [215]:
# Specify the directory path
dataset_path = 'datasets/testing1'

# List all files in the specified path, ignoring directories
files = [f for f in os.listdir(dataset_path) if os.path.isfile(os.path.join(dataset_path, f))]
files.sort()

# read the files
points_xyz = []
for s in files:
    path = 'datasets/testing1/' + s
    df = pd.read_csv(path)
    a = df.to_numpy()
    b = a[:,8:11]
    points_xyz.append(b)

# Now we can find the fiew direction of each points:
# NOTE: points in spherical coordinate are arranged: [r, elev, pan]
points_sphere = []
for points in points_xyz:
    points_sphere.append(cart2sph(points))

In [216]:
### we now process the data
# Translation vectors for points in each view, we are using camera centre at first frame as origin of world coordinate
# NOTE: translation vectors below are found by assuming transformation between frames are translations, and obatined by manually finding corrspondance
# They are translation of the same corrspondance across different frames
# HARD CODED HERE
t0 = np.array([0,0,0])
t1 = np.array([-0.671,-0.016,0.215])
t2 = np.array([-1.825,-0.091,0.147])
t3 = np.array([-2.661,-0.263,0.166])
t4 = np.array([-3.607,-0.156,0.039])
translations = [t0, t1, t2, t3, t4]

# camera centre locations
centres = [-t for t in translations]
centres_data = []
for i,c in enumerate(centres):
    l = len(points_sphere[i])
    temp = np.tile(c, (l, 1))
    centres_data.append(temp)

In [217]:
# stack the points into one big matrix
stacked = []
for i in range(len(points_sphere)):
    temp = np.hstack((points_sphere[i], centres_data[i]))
    stacked.append(temp)

dataset = np.array([])
for i in range(len(stacked)):
    if i == 0:
        dataset = stacked[i]
    else:
        dataset = np.vstack((dataset, stacked[i]))
np.random.shuffle(dataset)

# Filter out points where the distance value is = 0
mask1 = dataset[:, 0] != 0
mask2 = dataset[:,0] > 50
mask = mask1 + mask2
dataset = dataset[mask]

In [218]:
ele = dataset[:,1]
pan = dataset[:,2]
ele = ele + np.pi / 2

In [219]:
# Maximum degree of harmonics
l_max = 8
num_features = sum(2 * l + 1 for l in range(l_max + 1))
features = np.zeros((dataset.shape[0], num_features))
feature_idx = 0
for l in range(l_max + 1):
    for m in range(-l, l + 1):
        Y_lm = sph_harm(m, l, pan, ele)
        features[:, feature_idx] = Y_lm.real  # Storing real part, or use absolute values, etc.
        feature_idx += 1

a = rearrange(dataset[:,0], 'a -> a 1')
encoded_data = np.hstack((a,features, dataset[:,3:]))

### Now prepare to train the model

In [220]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using {device} device")

Using cuda device


In [221]:
# Convert dataset to pytorch tensor
X = np.array(encoded_data[:,1:])
y = np.array(encoded_data[:,0])

# Convert to tensor:
X_tensor = torch.from_numpy(X).double()
y_tensor = torch.from_numpy(y).double()

In [222]:
dataset = TensorDataset(X_tensor, y_tensor)
batch_size = 256
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [None]:
class SphericalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [223]:
# Now we prepare to train the model
features = X.shape[1]

class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(features, 512),  # Input layer with 5 inputs and 10 outputs
            nn.ReLU(),                # Activation function
            nn.Linear(512, 512),        # Hidden layer with 512 neurons
            nn.ReLU(),                # Activation function
            nn.Linear(512, 512),        # Hidden layer with 512 neurons
            nn.ReLU(),                # Activation function
            nn.Linear(512, 512),        # Hidden layer with 512 neurons
            nn.ReLU(),                # Activation function
            nn.Linear(512, 512),        # Hidden layer with 512 neurons
            nn.ReLU(),                # Activation function
            nn.Linear(512, 1)          # Output layer with 1 output
        )
        
    def forward(self, x):
        return self.layers(x)

# Initialize the model
model = MLP().to(device)

# Loss and Optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [224]:
# Training Loop
num_epochs = 100
for epoch in range(num_epochs):
    for inputs, targets in dataloader:
        inputs, targets = inputs.to(device).float(), targets.to(device).float()
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    print(loss)

  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


tensor(502.6203, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(455.6175, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(639.0286, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(570.9342, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(652.2644, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(521.6774, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(1089.5529, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(487.2943, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(580.2345, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(473.9190, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(651.0030, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(578.7991, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(1669.9438, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(517.9233, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(340.0982, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(714.5746, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor