In [None]:
import torch
from torch.distributions.multivariate_normal import MultivariateNormal
from torch.autograd import Variable
import tools.utils as utils
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
from matplotlib import animation, rc
from torch.optim import LBFGS
import numpy as np
from IPython.display import display, Math, Latex, Markdown, HTML
import pandas as pd
import matplotlib
import matplotlib.image as mpimg
from torch.profiler import profile, record_function, ProfilerActivity

matplotlib.rcParams['animation.embed_limit'] = 2**128

In [None]:
device = "cuda"

In [None]:
target_pcd = torch.tensor(utils.pcl_from_pcd("../dataset/map.pcd"), device=device, dtype=torch.float32)

In [None]:
# removing the floor points
z_min = torch.min(target_pcd[:,2])
print(z_min)
threshold = -1
target_pcd = target_pcd[target_pcd[:,2] > threshold]
target_pcd = target_pcd[:, :2]

In [None]:
class NDT_map():
    def __init__(self, grid_map, means, covs, counts, resolution, x_min, y_min, distributions) -> None:
        self.grid_map = grid_map
        self.means = means
        self.covs = covs
        self.counts = counts
        self.nx = grid_map.shape[0]
        self.ny = grid_map.shape[1]
        self.x_min = x_min
        self.y_min = y_min
        self.resolution = resolution
        self.distributions = distributions
        

In [None]:
def voxelize(pcd, x_min, x_max, y_min, y_max, resolution):
    """
    Voxelize a point cloud.
    Args:
        pcd: point cloud, shape (N, 3)
        x_min, x_max, y_min, y_max: voxelization range
        resolution: voxel size
    Returns:
        voxelized pcd, shape (N, 3)
    """
    points = pcd.clone()
    map = torch.zeros(
        (nx := int(torch.ceil((x_max - x_min) / resolution)),
        ny := int(torch.ceil((y_max - y_min) / resolution)), 2), device=device, dtype=torch.float32)
    # write cell centers to map
    map[:,:,0], map[:,:,1] = torch.meshgrid(
        torch.arange(x_min, x_max, resolution, device=device),
        torch.arange(y_min, y_max, resolution, device=device)
        )
    offset = torch.zeros((1,2), device=device, dtype=torch.float32)
    offset[0,0] = x_min
    offset[0,1] = y_min
    pcd = pcd - offset
    pcd = pcd / resolution
    pcd = pcd.round()
    pcd[:,0] = torch.clamp(pcd[:,0], 0, nx-1)
    pcd[:,1] = torch.clamp(pcd[:,1], 0, ny-1)
    pcd = pcd.to(torch.int64)

    pcd_ = torch.unique(pcd, dim=0)
    counts_map = torch.zeros((nx,ny), device=device, dtype=torch.int64)

    means = torch.zeros((map.shape[0],map.shape[1],2), device=device, dtype=torch.float32)
    covs = torch.zeros((map.shape[0],map.shape[1],2,2), device=device, dtype=torch.float32)
    distributions = np.zeros((map.shape[0],map.shape[1]), dtype=object)

    sizes = []
    for idx in pcd_:
        # repeat icd to get the same shape as points
        idx = idx.repeat(points.shape[0], 1)
        point_group = points[torch.all(pcd == idx, dim=1),:]
        sizes.append(point_group.shape[0])
        if point_group.shape[0] <= 4:
            continue
        cov = torch.cov(point_group.T)
        covs[idx[0,0], idx[0,1], :] = torch.cov(point_group.T)
        means[idx[0,0], idx[0,1], :] = point_group.mean(dim=0)
        counts_map[idx[0,0], idx[0,1]] = point_group.shape[0]
        distributions[idx[0,0], idx[0,1]] = MultivariateNormal(means[idx[0,0], idx[0,1], :], covs[idx[0,0], idx[0,1], :])
    assert sum(sizes) == points.shape[0]

    ndt_map = NDT_map(map, means, covs, counts_map, resolution, x_min, y_min, distributions)

    return ndt_map

def calculate_score(reference_map: NDT_map, source_pcd: torch.Tensor, t: torch.Tensor):
    """
    Function to calculate the score of the alignment of the source_pcd to the reference_map

    Args:
        reference_map (NDT_map): reference map
        source_pcd (torch.Tensor): source point cloud
        t (torch.Tensor): transformation parameters
    """
    R = torch.zeros(2, 2, device=device, dtype=torch.float32)
    R[0, 0] = torch.cos(t[2])
    R[0, 1] = -torch.sin(t[2])
    R[1, 0] = torch.sin(t[2])
    R[1, 1] = torch.cos(t[2])
    source_pcd = source_pcd @ R + t[:2]

    points = source_pcd.clone()
    # mapping points from the source_pcd to the grid of the reference map
    source_pcd = source_pcd - torch.tensor([reference_map.x_min, reference_map.y_min], device=device, dtype=torch.float32)
    source_pcd = source_pcd / reference_map.resolution
    source_pcd = source_pcd.round()
    # only keep cells that are in the map
    #mask = torch.tensor((source_pcd[:,0] > 0) & (source_pcd[:,0] < reference_map.nx-1) &
    #    (source_pcd[:,1] > 0) & (source_pcd[:,1] < reference_map.ny-1), device=device, dtype=torch.bool)
    #source_pcd = source_pcd[mask]
    #points = points[mask]
    source_pcd[:,0] = torch.clamp(source_pcd[:,0], 0, reference_map.nx-1)
    source_pcd[:,1] = torch.clamp(source_pcd[:,1], 0, reference_map.ny-1)
    source_pcd = source_pcd.to(torch.int32)

    # get the unique points
    source_pcd_ = torch.unique(source_pcd, dim=0)
    
    score = torch.tensor(0, device=device, dtype=torch.float32, requires_grad=True)
    k = 0
    for idx in source_pcd_:
        # active points
        idx = idx.repeat(points.shape[0], 1)
        act_points = points[torch.all(source_pcd == idx, dim=1),:].requires_grad_(True)
        
        if reference_map.counts[idx[0,0],idx[0,1]] <= 4:
            score = score - 100*act_points.shape[0]
            continue
        distribution = reference_map.distributions[idx[0,0],idx[0,1]]
        k += act_points.shape[0]
        # evaluate distribution for each point
        score = score + distribution.log_prob(act_points).sum()
    score = score / k
    return -score

In [None]:
x_min, x_max, y_min, y_max = torch.min(target_pcd[:,0]), torch.max(target_pcd[:,0]), torch.min(target_pcd[:,1]), torch.max(target_pcd[:,1])
ndt_map = voxelize(target_pcd, x_min=x_min, x_max=x_max, y_min=y_min, y_max=y_max, resolution = 4.854255375398731)

In [None]:
def plot_ndt(ndt_map: NDT_map):
    map = ndt_map.grid_map
    means = ndt_map.means
    covs = ndt_map.covs
    counts = ndt_map.counts
    nx = ndt_map.nx
    ny = ndt_map.ny
    resolution = ndt_map.resolution

    plt.figure(figsize=(15,15))
    # plot rectangles around cells
    for i in range(nx):
        for j in range(ny):
            if counts.cpu().numpy()[i,j] == 0:
                continue
            plt.plot(
            [map.cpu().numpy()[i,j,0]-resolution/2, map.cpu().numpy()[i,j,0]+resolution/2, map.cpu().numpy()[i,j,0]+resolution/2, map.cpu().numpy()[i,j,0]-resolution/2, map.cpu().numpy()[i,j,0]-resolution/2],
            [map.cpu().numpy()[i,j,1]-resolution/2, map.cpu().numpy()[i,j,1]-resolution/2, map.cpu().numpy()[i,j,1]+resolution/2, map.cpu().numpy()[i,j,1]+resolution/2, map.cpu().numpy()[i,j,1]-resolution/2],
            color='black', alpha=0.1
            )
            
    # draw covs
    for i in range(nx):
        for j in range(ny):
            if np.allclose(covs.cpu().numpy()[i,j,:], np.zeros((2,2))):
                continue
            a,b,c = covs.cpu().numpy()[i,j,0,0],covs.cpu().numpy()[i,j,0,1],covs.cpu().numpy()[i,j,1,1]
            width = (a+c)/2 + np.sqrt(((a-c)/2)**2 + b**2)
            height = (a+c)/2 - np.sqrt(((a-c)/2)**2 + b**2)
            width, height  = sorted([width, height])
            assert height > width
            width = max(width, 0.1)
            plt.gca().add_patch(Ellipse(
            xy=(means.cpu().numpy()[i,j,0], means.cpu().numpy()[i,j,1]),
            width= height,
            height= width,
            angle=np.arctan2(
            height - a, b
            ) * 180 / np.pi,
            color='blue'
        ))
            
    plt.axis('equal')

plot_ndt(ndt_map)

# load a frame
frame = 70
source_pcd = torch.tensor(utils.pcl_from_pcd(f"../dataset/frames/frame_{frame}.pcd"), device=device, dtype=torch.float32, requires_grad=True)
threshold = -1.5
source_pcd = source_pcd[source_pcd[:,2] > threshold]
source_pcd = source_pcd[:, :2]

# translate
t = torch.tensor([0, 0, 0], device=device, dtype=torch.float32, requires_grad=True)
rotation_matrix = torch.tensor([[torch.cos(t[2]), -torch.sin(t[2])], [torch.sin(t[2]), torch.cos(t[2])]], device=device, requires_grad=True, dtype=torch.float32)

plt.plot(
    (source_pcd @ rotation_matrix).cpu().detach().numpy()[:,0]+t[:2].cpu().detach().numpy()[0],
    (source_pcd @ rotation_matrix).cpu().detach().numpy()[:,1]+t[:2].cpu().detach().numpy()[1], 'o', color='red', alpha=0.005
)


score = calculate_score(ndt_map, source_pcd, t)
score.backward(retain_graph=True)
print(f"{score=}")


print(t.grad)

In [None]:
df = pd.read_csv("../dataset/ground_truth.csv")
gt = df[["x", "y"]].to_numpy()

In [None]:
def train(ndt_map, frame, init_value):
    # load a frame
    source_pcd = torch.tensor(utils.pcl_from_pcd(f"../dataset/frames/frame_{frame}.pcd"), device=device, dtype=torch.float32)
    source_pcd = source_pcd[source_pcd[:,2] > threshold]
    source_pcd = source_pcd[:, :2].clone().requires_grad_(True)
    t = torch.tensor([init_value[0], init_value[1], init_value[2]], device=device, dtype=torch.float32, requires_grad=True)


    j = 0
    t_ = t.detach().clone()
    scores = []
    vals = []
    while True:
        score = calculate_score(ndt_map, source_pcd, t)
        scores.append(score.cpu().detach().numpy())
        vals.append(t.cpu().detach().numpy())
        score.backward()
        def calculate_score_mod(t, ndt_map=ndt_map, source_pcd = source_pcd):
            return calculate_score(ndt_map, source_pcd, t)
        
        try:
            dt = (torch.linalg.inv(torch.autograd.functional.hessian(calculate_score_mod, t.detach())) @ t.grad)
        except:
            dt = torch.rand(3, device=device, dtype=torch.float32)*0.05
        
        t = (t - dt).detach()
        
        if (torch.linalg.norm(dt) < 1e-2) or j > 2:
            if torch.max(t - t_) > 0.6:
                return t_.cpu().detach().numpy(), j
            break
        j += 1
    del source_pcd
    t_best = np.array(vals[(pos := np.argmin(scores))])

    return t_best, j

In [None]:
x_min, x_max, y_min, y_max = torch.min(target_pcd[:,0]), torch.max(target_pcd[:,0]), torch.min(target_pcd[:,1]), torch.max(target_pcd[:,1])
ndt_map = voxelize(target_pcd, x_min=x_min, x_max=x_max, y_min=y_min, y_max=y_max, resolution =  4.854255375398731)

track = []

start = 0
val = [*gt[start],0]
for idx in range(start, gt.shape[0]):
    acc = 0
    vel = 0
    if len(track) > 20:
        # linear extrapolation
        val = (track[-1] + 
                (vel := (-1*track[-20]+track[-1])/(19)) + 
                (acc := (9*track[-20]-19*track[-10]+10*track[-1])/(855*2)))
    val, its = train(ndt_map, idx, val)
    print(f"idx {idx} iterations {its} error:", error := np.linalg.norm(val[:2]-gt[idx]), f"t: {val}, acc: {acc}, vel: {vel}")

    if error > 3:
        print("way too big")
        break
    track.append(val)

track = np.array(track)
df = pd.DataFrame(track, columns=["x", "y", "theta"])
df.to_csv("track.csv", index=False)

In [None]:

def animate_track(ndt_map: NDT_map, track=None):
    img = mpimg.imread('../Car-Top-View.png')
    img = np.flipud(img)

    extent = [-1.4, 1.4, -2.70, 2.7]
    map = ndt_map.grid_map
    means = ndt_map.means
    covs = ndt_map.covs
    counts = ndt_map.counts
    nx = ndt_map.nx
    ny = ndt_map.ny
    resolution = ndt_map.resolution
    
    fig,axs = plt.subplots(1, 2, figsize=(25,6), gridspec_kw={'width_ratios': [3, 1]})
    fig.tight_layout(rect=[0, 0.03, 1, 0.95])
    fig.suptitle("Visualization of the NDT map and the track", fontsize=16)
    
    for i in range(nx):
        for j in range(ny):
            if counts.cpu().numpy()[i,j] == 0:
                continue
            axs[0].plot(
                [map.cpu().numpy()[i,j,0]-resolution/2, map.cpu().numpy()[i,j,0]+resolution/2, map.cpu().numpy()[i,j,0]+resolution/2, map.cpu().numpy()[i,j,0]-resolution/2, map.cpu().numpy()[i,j,0]-resolution/2],
                [map.cpu().numpy()[i,j,1]-resolution/2, map.cpu().numpy()[i,j,1]-resolution/2, map.cpu().numpy()[i,j,1]+resolution/2, map.cpu().numpy()[i,j,1]+resolution/2, map.cpu().numpy()[i,j,1]-resolution/2],
                color='black', alpha=0.1
            )

    reference = utils.pcl_from_pcd("../dataset/map.pcd")[:, :2]
    axs[0].plot(reference[:,0], reference[:,1],'o', color='black',  alpha=0.002, label="reference map")
    
    
    data = utils.pcl_from_pcd(f"../dataset/frames/frame_{0}.pcd")
    (measurement,) = axs[0].plot(data[:,0], data[:,1], 'o', color='red', alpha=0.01, label="measured points")
    (ground_truth,) = axs[0].plot(gt[:1,0], gt[:1,1], color='green', label="ground truth")
    leg = axs[0].legend()

    for lh in leg.legendHandles: 
        lh.set_alpha(1)
    axs[1].imshow(img, alpha=0.5,extent=extent, origin='upper')
    axs[1].axis('equal')

    axs[1].set_xlabel("lateral error [m]")
    axs[1].set_ylabel("longitudinal error [m]")

    (error, ) = axs[1].plot([], [], color='red')

    def animate(frame):
        print(f"starting frame: {frame}")
        t = track[frame]
        rotation_matrix = np.array([[np.cos(t[2]), -np.sin(t[2])], [np.sin(t[2]), np.cos(t[2])]])
        data = utils.pcl_from_pcd(f"../dataset/frames/frame_{frame}.pcd")[:,:2]
        data = data @ rotation_matrix
        measurement.set_data(data[:,0]+t[0], data[:,1]+t[1])
        ground_truth.set_data(gt[:frame,0], gt[:frame,1])

        plot_len = np.clip(frame-50, 0, frame-50)
        error.set_data(track[plot_len: frame,1]-gt[plot_len:frame,1], track[plot_len: frame,0]-gt[plot_len: frame,0])
        return (measurement,ground_truth, error)

    anim = animation.FuncAnimation(fig, animate, frames=len(track),
                                    blit=True, interval=50)
    # save the animation as mp4 video file
    anim.save("ndt_animation.mp4", fps=60, extra_args=["-vcodec", "libx264"])
    #return HTML(anim.to_jshtml())
    
    
animate_track(ndt_map, track)