In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import json
from torch.nn.utils.rnn import pad_sequence

if torch.cuda.is_available():
    print("CUDA is available. PyTorch can use your GPU.")
    print(f"Number of GPUs available: {torch.cuda.device_count()}")
    print(f"GPU Name: {torch.cuda.get_device_name(0)}") # Prints the name of the first GPU
else:
    print("CUDA is not available. PyTorch will run on CPU.")

CUDA is available. PyTorch can use your GPU.
Number of GPUs available: 1
GPU Name: NVIDIA GeForce RTX 4070 Laptop GPU


In [2]:
class TrajectoryDataset(Dataset):

    def __init__(self, json_file):

        with open(json_file, 'r') as f:
            self.data_dict = json.load(f)
        
        self.data = self.data_dict['data']
        
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):

        item = self.data[idx]
        
        goal = np.array([
            item['config']['goal']['x'],
            item['config']['goal']['y'],
            item['config']['goal']['z']
        ])
        
        config = goal # Label, Shape (3, )
        
        waypoints = []
        for wp in item['trajectory']['waypoints']:
            waypoints.append([
                wp['x'], wp['y'], wp['z'],
                wp['qx'], wp['qy'], wp['qz'], wp['qw']
            ])
        trajectory = np.array(waypoints)  # Output Data, Shape: (seq_len, 7)
        
        return torch.FloatTensor(config), torch.FloatTensor(trajectory)
    

def collate_pad(batch):
    xs, ys = zip(*batch)                 # tuples
    xs = torch.stack(xs, dim=0)          # [B, 3]

    
    lengths = torch.tensor([y.size(0) for y in ys], dtype=torch.long)
    ys = pad_sequence(ys, batch_first=True) 

    B, T_max, _ = ys.shape

    mask = torch.zeros(B, T_max, dtype=torch.bool)

    for b, L in enumerate(lengths):
        mask[b, :L] = True

    return xs, ys, lengths, mask  

In [3]:
class lstm_trajectory_generator(nn.Module):
    ''' Decodes hidden state output by encoder '''
    
    def __init__(self, input_size, trans_size, quat_size, hidden_size, num_layers):

        '''
        : param input_size:     the number of features in the input X
        : param hidden_size:    the number of features in the hidden state h
        : param num_layers:     number of recurrent layers (i.e., 2 means there are
        :                       2 stacked LSTMs)
        '''
        
        super(lstm_trajectory_generator, self).__init__()

        self.lstm = nn.LSTM(input_size = input_size, hidden_size = hidden_size, num_layers = num_layers)
        self.translation_linear = nn.Linear(hidden_size, trans_size)
        #self.quaternion_linear = nn.Linear(hidden_size, quat_size)

    def forward(self, x_input, encoder_hidden_states):
        
        '''        
        : param x_input:                    should be 2D (batch_size, input_size)
        : param encoder_hidden_states:      hidden states
        : return output, hidden:            output gives all the hidden states in the sequence;
        :                                   hidden gives the hidden state and cell state for the last
        :                                   element in the sequence 
 
        '''
        
        lstm_out, hidden = self.lstm(x_input.unsqueeze(0), encoder_hidden_states) 
        translation_pred = self.translation_linear(lstm_out.squeeze(0))

        # quaternion_pred = self.quaternion_linear(lstm_out.squeeze(0))
        # quaternion_pred = F.normalize(quaternion_pred, p=2, dim=-1)
        
        return translation_pred, hidden

In [None]:
def train(model, train_loader, val_loader, 
          epochs, learning_rate,
          device='cuda' if torch.cuda.is_available() else 'cpu'):
    
    model = model.to(device)
    params = model.parameters()
    optimizer = optim.Adam(params, lr=learning_rate)
    state_loss = nn.MSELoss(reduction='none')

    history = {
        'train_loss': [],
        'val_loss': [],
    }


    for epoch in range(epochs):

        model.train()
        training_loss = 0.0

        for i, batch in enumerate(train_loader):

            goal = batch[0].to(device)
            gt_traj = batch[1].to(device) # (batch_size, max_batch_traj_length, 7)
            gt_lengths = batch[2].to(device)
            gt_mask = batch[3].to(device)
            batch_size, max_traj_len, _= gt_traj.shape

            optimizer.zero_grad()

            gt_translation = gt_traj[:, :, :2]  # (B, T, 2) - just x, y

            hidden = None

            pred_translations = torch.zeros(batch_size, max_traj_len, 2).to(device) # (B, T, 2)

            for t in range(max_traj_len):
                trans_pred, hidden = model(goal, hidden)
                pred_translations[:, t, :] = trans_pred

            
            trans_loss = state_loss(pred_translations, gt_translation)  # (B, T, 2)
            trans_loss = trans_loss.mean(-1)  # Average over x, y -> (B, T)
            #trans_loss_masked = (trans_loss * gt_mask).sum() / gt_mask.sum()

            B, T_max, _ = pred_translations.shape
            time_weights = torch.linspace(1.0, 5.0, steps=T_max, device=device)  # for example
            time_weights = time_weights.unsqueeze(0)  # (1, T_max)
            weighted_loss = trans_loss * time_weights * gt_mask


            if epoch % 25 == 0 and i == 0:  # First batch every 20 epochs
                
                # Translation predictions
                print("\n=== Translation Predictions vs Ground Truth ===")
                
                # Track all errors across the batch
                all_batch_errors = []
                trajectory_means = []
                
                for b in range(batch_size):  # Show first 3 trajectories
                    valid_length = int(gt_lengths[b].item())
                    
                    if valid_length > 0:
                        print(f"\nTrajectory {b} (length={valid_length}):")
                        print("Step |  Pred (x, y)    |   GT (x, y)     |  Error")
                        print("-" * 55)
                        
                        # Track errors for mean calculation
                        trajectory_errors = []
                        
                        # Show entire trajectory
                        for t in range(valid_length):
                            pred_x = pred_translations[b, t, 0].item()
                            pred_y = pred_translations[b, t, 1].item()
                            gt_x = gt_translation[b, t, 0].item()
                            gt_y = gt_translation[b, t, 1].item()
                            
                            error = (pred_x - gt_x)**2 + (pred_y - gt_y)**2
                            trajectory_errors.append(error)
                            all_batch_errors.append(error)
                            
                            print(f"{t:4d} | ({pred_x:6.3f}, {pred_y:6.3f}) | "
                                f"({gt_x:6.3f}, {gt_y:6.3f}) | {error:6.4f}")
                        
                        # Print mean error for this trajectory
                        mean_error = sum(trajectory_errors) / len(trajectory_errors)
                        trajectory_means.append(mean_error)
                        print("-" * 55)
                        print(f"Mean trajectory error: {mean_error:6.4f}")
                
                # Print both types of batch mean errors
                print("=" * 55)
                if all_batch_errors:
                    batch_mean_error = sum(all_batch_errors) / len(all_batch_errors)
                    print(f"BATCH MEAN ERROR (all timesteps): {batch_mean_error:6.4f}")

                if trajectory_means:
                    mean_of_means = sum(trajectory_means) / len(trajectory_means)
                    print(f"BATCH MEAN ERROR (mean of means): {mean_of_means:6.4f}")


            loss = weighted_loss.sum() / (time_weights * gt_mask).sum()
            loss.backward()
            optimizer.step()
            training_loss += loss.item()

        average_train_loss = training_loss/len(train_loader)
        history['train_loss'].append(average_train_loss)

        print(f'Epoch {epoch+1}/{epochs}: Train Loss = {average_train_loss:.4f}')

    return history

In [5]:
dataset = TrajectoryDataset('datasets/goal_input_datasets/trajectory_dataset_test.json')

N = len(dataset)
train_size = int(0.8 * N)
val_size = int(0.2 * N)

train_dataset, val_dataset = random_split(
    dataset, [train_size, val_size],
)

train_dataloader = DataLoader(
    train_dataset,  
    batch_size=8,
    collate_fn=collate_pad,
    shuffle=True,
    num_workers=0  
)
        
val_dataloader = DataLoader(
    val_dataset, 
    batch_size=8,
    collate_fn=collate_pad,
    shuffle=False,
    num_workers=0
)

model = lstm_trajectory_generator(input_size=3, trans_size=2, quat_size=2, hidden_size=128, num_layers=2)

EPOCHS = 500
history = train(model=model, train_loader=train_dataloader, val_loader=None, epochs=EPOCHS, learning_rate=0.01)


=== Translation Predictions vs Ground Truth ===

Trajectory 0 (length=42):
Step |  Pred (x, y)    |   GT (x, y)     |  Error
-------------------------------------------------------
   0 | (-0.076, -0.057) | ( 0.000,  0.000) | 0.0090
   1 | (-0.081, -0.054) | ( 0.005,  0.002) | 0.0104
   2 | (-0.083, -0.053) | ( 0.020,  0.006) | 0.0140
   3 | (-0.084, -0.052) | ( 0.045,  0.014) | 0.0210
   4 | (-0.085, -0.052) | ( 0.080,  0.025) | 0.0331
   5 | (-0.085, -0.052) | ( 0.125,  0.040) | 0.0527
   6 | (-0.086, -0.051) | ( 0.180,  0.057) | 0.0824
   7 | (-0.086, -0.051) | ( 0.245,  0.078) | 0.1263
   8 | (-0.086, -0.051) | ( 0.320,  0.101) | 0.1881
   9 | (-0.086, -0.051) | ( 0.405,  0.128) | 0.2734
  10 | (-0.086, -0.051) | ( 0.500,  0.158) | 0.3875
  11 | (-0.086, -0.051) | ( 0.594,  0.188) | 0.5200
  12 | (-0.086, -0.051) | ( 0.678,  0.215) | 0.6550
  13 | (-0.086, -0.051) | ( 0.751,  0.238) | 0.7847
  14 | (-0.086, -0.051) | ( 0.815,  0.258) | 0.9080
  15 | (-0.086, -0.051) | ( 0.869,  0.

KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt

# Lists of 100 floats
train_losses = history['train_loss']
#val_losses = history['val_loss']

epochs = range(1, EPOCHS+1)

plt.figure()
plt.plot(epochs, train_losses, label='Train')
#plt.plot(epochs, val_losses, label='Validation')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
