In [1]:
import numpy as np 
import matplotlib.pyplot as plt 
import seaborn as sns
sns.set_theme()

import torch 
import torch.nn as nn 
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

torch.manual_seed(42)
torch.cuda.manual_seed_all(42)
torch.set_default_dtype(torch.float64)

%matplotlib inline

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

Using cuda device


In [3]:
from Utils.bernstein_torch import bernstein_coeff_order10_new
from Utils.pol_matrix_torch import pol_matrix_comp

# Generating P matrix
t_fin = 15.0
num = 100
tot_time = torch.linspace(0, t_fin, num)
tot_time_copy = tot_time.reshape(num, 1)
P, Pdot, Pddot = bernstein_coeff_order10_new(10, tot_time_copy[0], tot_time_copy[-1], tot_time_copy)

In [4]:
# Custom Dataset Loader 
class TrajDataset(Dataset):
    """Expert Trajectory Dataset."""
    def __init__(self, data):
        
        # Inputs
        self.inp = data[:, 0:55]
        
        # Outputs
        self.out = data[:, 55:]
        
    def __len__(self):
        return len(self.inp)
    
    def __getitem__(self, idx):
        
        # Inputs
        inp = self.inp[idx]
        
        # Outputs
        out = self.out[idx]
                 
        return torch.tensor(inp).double(), torch.tensor(out).double()

# Load the dataset
train_data = np.load("./Preprocessed Data/val_lane_4.npy", mmap_mode="c")

# Using PyTorch Dataloader
train_dataset = TrajDataset(train_data)
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=0)

In [5]:
from Utils.beta_cvae_aug_ddn import Encoder, Decoder, Beta_cVAE, BatchOpt_DDN, DeclarativeLayer

# DDN
num_batch = 100
node = BatchOpt_DDN(P, Pdot, Pddot, num_batch)
opt_layer = DeclarativeLayer(node)

# Beta-cVAE Inputs
enc_inp_dim = 55
enc_out_dim = 200
dec_inp_dim = enc_inp_dim
dec_out_dim = 8
hidden_dim = 1024 * 2
z_dim = 2

inp_mean, inp_std = 5.1077423, 20.914295

encoder = Encoder(enc_inp_dim, enc_out_dim, hidden_dim, z_dim)
decoder = Decoder(dec_inp_dim, dec_out_dim, hidden_dim, z_dim)
model = Beta_cVAE(encoder, decoder, opt_layer, inp_mean, inp_std).to(device)
model.load_state_dict(torch.load('./Weights/cvae_aug_bern_2.pth'))

<All keys matched successfully>

In [6]:
P = torch.block_diag(P, P).to(device)
Pdot = Pdot.to(device)
Pddot = Pddot.to(device)

In [7]:
import matplotlib.cm as cm

# No. of Generated Samples
N = num_batch

%matplotlib qt

model.eval()
for batch_num, (datas) in enumerate(train_loader):
    inp, out = datas
    
    inp = inp.to(device)
    out = out.to(device)

    # Normalized Input
    inp_norm = (inp - inp_mean) / inp_std

    # Initial State Ego
    initial_state_ego = inp[:, 2:6]
    initial_state_ego[:, 2:4] = initial_state_ego[:, 0:2]
    initial_state_ego[:, 0:2] = 0
    
    # Inference with cVAE
    batch_inp_norm = torch.vstack([inp_norm] * N)
    batch_init_state = torch.vstack([initial_state_ego] * N)
    z = torch.randn((N, z_dim), device=device)
    
    with torch.no_grad():
        y_star = model._decoder(z, batch_inp_norm, batch_init_state, 0, 0)
        traj_pred = (P @ y_star.T).T
    
    # Reverting Transformation
    inputs = inp.cpu().numpy()
    predictions = traj_pred.cpu().numpy() 
            
    plt.figure(figsize=(12, 8), dpi=90)
    
    # Obstacles 
    x_obs = inputs.flatten()[5::5]
    y_obs = inputs.flatten()[6::5]
        
    th = np.linspace(0, 2 * np.pi, 100)

    a_obs, b_obs = 5.8, 3.2
    
    for i in range(0, 10):
        x_ell = x_obs[i] + a_obs * np.cos(th)
        y_ell = y_obs[i] + b_obs * np.sin(th)
        plt.plot(x_ell, y_ell, '-k', linewidth=1.0)

    # Original Distribution
    x_gt = (P @ out.T).T
    x = x_gt.flatten()[0:100]
    y = x_gt.flatten()[100:200]

    plt.plot(x.cpu().numpy(), y.cpu().numpy(), color="black", label="Ground Truth", linewidth=5.5)
    
    # Generated
    x_traj = predictions[:, 0:100]
    y_traj = predictions[:, 100:200]
            
    # Plotting different trajectories
    colors = cm.rainbow(np.linspace(0, 1, len(x_traj)))
    j = 0
    for y, c in zip(y_traj, colors):
        plt.plot(x_traj[j], y, color=c, linewidth=1.5)
        j += 1

    plt.axhline(y=inp.flatten()[0].cpu().numpy(), linewidth=2.5, color='blue')
    plt.axhline(y=inp.flatten()[1].cpu().numpy(), linewidth=2.5, color='blue')
    plt.legend()
    plt.axis("equal")
    plt.show()

    break