In [None]:
import torch
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
import json

from nemo.global_planner import AStarGradPlanner
from nemo.nemo import Nemo
from nemo.util import wrap_angle_torch, path_metrics, grid_2d, airsim_to_nemo, nemo_to_airsim
from nemo.plotting import plot_surface, plot_path_3d
from nemo.planning import path_optimization

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

%load_ext autoreload
%autoreload 2

## Parameters

In [None]:
SCENE_NAME = 'UnrealMoon'  # 'KT22', 'RedRocks', 'UnrealMoon', 'AirSimMountains'

N_GRID = 64  # Grid resolution for A*
N_PLOT = 256  # Grid resolution for plotting
HEIGHT_SCALE = 1e2  # Height scaling factor for A*

AIRSIM = True if SCENE_NAME == 'AirSimMountains' or 'UnrealMoon' else False

if AIRSIM:
    # Get nerf dataparser transform
    params = {}
    dataparser_transforms = json.load(open(f'../models/{SCENE_NAME}/dataparser_transforms.json'))
    params['dataparser_transform'] = np.array(dataparser_transforms['transform'])
    params['dataparser_scale'] = dataparser_transforms['scale']

    # Specify start and end in AirSim coordinates
    if SCENE_NAME == 'AirSimMountains':
        airsim_start = np.array([[177., -247., -33.]])  
        airsim_end = airsim_start + np.array([[-192., -328., -68.]])  
        params['spiral_center'] = np.array([99., -449., -57.])
    elif SCENE_NAME == 'UnrealMoon':
        airsim_start = np.array([[0.0, 0.0, 0.0]])  
        airsim_end = airsim_start + np.array([[1050.0, 324.0, -20.0]]) 
        params['spiral_center'] = np.array([524.38, 168.34, 0.0])  

    scene_start = airsim_to_nemo(airsim_start, params).squeeze()[:2]
    scene_end = airsim_to_nemo(airsim_end, params).squeeze()[:2]
else:
    # Specify start and end in scene coordinates
    scene_start = (0.7, 0.7)
    scene_end = (-0.7, -0.7)


print(f"Running Nemo planning for {SCENE_NAME}")
print(f"Start: {scene_start}, End: {scene_end}\n")

## Load Nemo model

In [None]:
print("Loading Nemo model\n")

# Load the Nemo model (automatically sends to device)
nemo = Nemo(f'../models/{SCENE_NAME}/encs.pth', f'../models/{SCENE_NAME}/mlp.pth')

# Manual cropping
if SCENE_NAME == 'KT22':
    BOUNDS = (-0.75, 0.75, -0.75, 0.75) 
elif SCENE_NAME == 'RedRocks':
    BOUNDS = (-0.4, 0.8, -0.6, 0.6)
elif SCENE_NAME == 'AirSimMountains':
    BOUNDS = (-0.75, 0.45, -0.6, 0.6)
elif SCENE_NAME == 'UnrealMoon':
    BOUNDS = (-1., 1., -1., 1.)

In [None]:
# Resample heights at higher resolution for plotting
positions, XY_grid = grid_2d(N_PLOT, BOUNDS)
heights = nemo.get_heights(positions)
z_grid = heights.reshape(N_PLOT, N_PLOT).detach().cpu().numpy()
x_grid = XY_grid[:,:,0].detach().cpu().numpy()
y_grid = XY_grid[:,:,1].detach().cpu().numpy()

# Plot height field and paths
fig = plot_surface(x_grid, y_grid, z_grid, no_axes=False, showscale=True)
# Plot start and end
fig.add_trace(go.Scatter3d(x=[0], y=[0], z=[-0.5], mode='markers', marker=dict(size=10, color='red')))
fig.add_trace(go.Scatter3d(x=[scene_start[0]], y=[scene_start[1]], z=[-0.4], mode='markers', marker=dict(size=10, color='blue')))
fig.add_trace(go.Scatter3d(x=[scene_end[0]], y=[scene_end[1]], z=[-0.4], mode='markers', marker=dict(size=10, color='blue')))
fig.show()

## A* Initialization

In [None]:
print("Running A* initialization\n")

# Form a grid of positions
positions, XY_grid = grid_2d(N_GRID, BOUNDS)
# Query heights
heights = nemo.get_heights(positions)
z_grid = heights.reshape(N_GRID, N_GRID).detach().cpu().numpy()

# Initialize the planner with scaled heightmap (add 1.0 to heights to make them all positive)
scaled_heights = HEIGHT_SCALE * (z_grid + 1.0).reshape(N_GRID, N_GRID)
astar = AStarGradPlanner(scaled_heights, BOUNDS)

# Compute path
astar_path_xy = astar.spatial_plan(tuple(scene_start), tuple(scene_end))
astar_path_xy_torch = torch.tensor(astar_path_xy, device=device)
# Get heights along path
astar_path_zs = nemo.get_heights(astar_path_xy_torch)  

# Save path as torch tensor
astar_path = torch.cat((astar_path_xy_torch, astar_path_zs), dim=1)

## Path optimization

In [None]:
opt_path = path_optimization(nemo, astar_path_xy_torch, iterations=500, lr=1e-3)

In [None]:
fig = plot_surface(x_grid, y_grid, z_grid, no_axes=False)
fig = plot_path_3d(fig=fig, x=opt_path[:,0].detach().cpu().numpy(), 
                        y=opt_path[:,1].detach().cpu().numpy(), 
                        z=opt_path[:,2].detach().cpu().numpy()+1e-3,
                        markers=False, color='orange', linewidth=10)
fig.show()

In [None]:
# Convert Nemo coordinates to AirSim coordinates (local)
airsim_path_3d = nemo_to_airsim(opt_path.detach().cpu().numpy(), params)

In [None]:
opt_path

In [None]:
from nemo.dynamics import diff_flatness

u = diff_flatness(opt_path[:,:2], nemo, dt=1.0)

In [None]:
fig = plot_path_3d(x=airsim_path_3d[:,0], y=airsim_path_3d[:,1], z=airsim_path_3d[:,2], hovertext=np.arange(len(airsim_path_3d)), color='orange')
fig.update_layout(width=1600, height=900, scene=dict(aspectmode='data'))
fig.show()

In [None]:
manual_path = np.load('../results/airsim_paths/moon_manual_1.npz')['states']

In [None]:
fig = plot_path_3d(x=airsim_path_3d[:,0], y=airsim_path_3d[:,1], z=airsim_path_3d[:,2], hovertext=np.arange(len(airsim_path_3d)), color='orange')
fig = plot_path_3d(fig=fig, x=manual_path[:,0], y=manual_path[:,1], z=manual_path[:,2], hovertext=np.arange(len(manual_path)), color='blue')
fig.update_layout(width=1600, height=900, scene=dict(aspectmode='data'))
fig.show()

In [None]:
np.save('../results/airsim_paths/path.npy', airsim_path_3d)

### Dubin's with $\theta$ optimization

In [None]:
# Compute initial headings
thetas = torch.atan2(path_xy_torch[1:,1] - path_xy_torch[:-1,1], path_xy_torch[1:,0] - path_xy_torch[:-1,0])  
# Duplicate last heading
thetas = torch.cat((thetas, thetas[-1].unsqueeze(0)), dim=0)

path = torch.cat((path_xy_torch, thetas.unsqueeze(1)), dim=1)  # (x, y, theta)
# Fixed variables are initial and final states, free variables are intermediate states
path_start = path[0].clone().detach()
path_end = path[-1].clone().detach()
path_opt = path[1:-1].clone().detach().requires_grad_(True)

In [None]:
# Dubin's based cost
def cost(path, dt=1.0):
    thetas = path[:,2]  
    omegas = wrap_angle_torch(thetas.diff()) / dt  
    # Path Vs
    path_dxy = torch.diff(path[:,:2], dim=0)
    Vs = torch.norm(path_dxy, dim=1) / dt
    controls_cost = 0.1 * (torch.abs(Vs)).nanmean() + (torch.abs(omegas)).nanmean()
    # Slope cost
    path_zs = 10 * nemo.get_heights(path)
    path_zs -= path_zs.min()
    path_zs = path_zs**2
    slope_cost = 1 * (torch.abs(path_zs.diff(dim=0))).nanmean()
    print(f"controls_cost: {controls_cost}, slope_cost: {slope_cost}")
    return controls_cost + slope_cost

In [None]:
path_zs = 10 * nemo.get_heights(path)
path_zs -= path_zs.min()
path_zs = path_zs**2
print(path_zs.min(), path_zs.max())
costs = torch.abs(path_zs.diff(dim=0))
print(costs.min(), costs.max())

In [None]:
# Optimize path
opt = torch.optim.Adam([path_opt], lr=1e-3)

for it in range(500):
    opt.zero_grad()
    path = torch.cat((path_start[None], path_opt, path_end[None]), dim=0)
    c = cost(path)
    c.backward()
    opt.step()
    if it % 50 == 0:
        print(f'it: {it},  Cost: {c.item()}')

print(f'Finished optimization - final cost: {c.item()}')

In [None]:
path_zs = nemo.get_heights(path[:,:2])
path_3d = torch.cat((path[:,:2], path_zs), dim=1)

In [None]:
fig = go.Figure()
fig = plot_surface(fig, x_grid, y_grid, z_grid, no_axes=True)
fig = plot_path_3d(fig, x=path_3d[:,0].detach().cpu().numpy(), 
                        y=path_3d[:,1].detach().cpu().numpy(), 
                        z=path_3d[:,2].detach().cpu().numpy())
fig.show()

### Double integrator dynamics

In [None]:
dt = 0.1
path_vs = torch.diff(path, dim=0) / dt
path_as = torch.diff(path_vs, dim=0) / dt
controls_cost = 2 * (torch.norm(path_as, dim=1)**2).mean()

In [None]:
def resample_path(path, rate=10):
    """Resample path at higher resolution using double integrator dynamics"""
    path_vs = torch.diff(path, dim=0) / dt
    path_as = torch.diff(path_vs, dim=0) / dt
    path_resampled = [path[0]]
    for i in range(len(path)-1):
        for j in range(rate):
            t = j / rate
            path_resampled.append(path[i] + path_vs[i]*t + 0.5*path_as[i]*t**2)
    print(path[-1])
    path_resampled.append(path[-1])
    return torch.stack(path_resampled)

In [None]:
resampled_path = resample_path(path, rate=10)
resampled_path

In [None]:
# Double integrator dynamics
def di_cost(path, dt=0.1):
    path_vs = torch.diff(path, dim=0) / dt
    path_as = torch.diff(path_vs, dim=0) / dt
    path_dxy = torch.diff(path, dim=0)
    Vs = torch.norm(path_dxy, dim=1) / dt
    return torch.mean(Vs**2)

In [None]:
opt = torch.optim.Adam([path_opt], lr=1e-3)

for it in range(500):
    opt.zero_grad()
    path = torch.cat((path_start[None], path_opt, path_end[None]), dim=0)
    c = dubins_cost(path)
    c.backward()
    opt.step()
    if it % 50 == 0:
        print(f'it: {it},  Cost: {c.item()}')