# CSE251B Project Milestone Starter File

## Step 1: Import Dependencies:

In [None]:
!pip install torch-geometric

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch_geometric.data import Data, Batch
from torch_geometric.nn import GCNConv, global_mean_pool
import torch.nn.functional as F
from transformers import MambaModel, MambaConfig
from typing import Optional, Tuple, Dict
import tqdm

## Step 2: Load the Dataset

#### You need to describe in your own words what the dataset is about, and use mathematical language and formulate your prediction task on the submitted PDF file for Question 1 Problem A.

#### Here we are loading the dataset from the local directory. And answer Question 1 Problem B

In [None]:
from google.colab import drive
drive.mount('/content/drive')
train_npz = np.load('/content/drive/My Drive/251B_data/train.npz')
test_npz = np.load('/content/drive/My Drive/251B_data/test_input.npz')
train_data = train_npz['data']
test_data = test_npz['data']

In [None]:
print(train_data.shape, test_data.shape)

# Split once for later use
X_train = train_data[..., :50, :]
Y_train = train_data[:, 0, 50:, :2]

In [None]:
xy_in = train_data[:, :, :50, :2].reshape(-1, 2)
# only find the x, y != 0
xy_in_not_0 = xy_in[(xy_in[:, 0] != 0) & (xy_in[:, 1] != 0)]

#### Try to play around with dataset for training and testing, make exploratory analysis on the dataset for bonus points(up to 2)

## Step 3: Setting up the Training and Testing

### Example Code:

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset
from torch_geometric.data import Data

class TrajectoryDatasetTrain(Dataset):
    def __init__(self, data, scale=10.0, augment=True):
        """
        data: shape (N, 50, 110, 6)
        scale: normalization factor
        augment: whether to apply random horizontal flipping
        """
        self.data = data
        self.scale = scale
        self.augment = augment

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        scene = self.data[idx].copy()

        # === Rotate so agent 0 has heading 0 at t=49 ===
        ego_heading = scene[0, 49, 4]
        cos_theta = np.cos(-ego_heading)
        sin_theta = np.sin(-ego_heading)
        R = np.array([[cos_theta, -sin_theta],
                      [sin_theta,  cos_theta]], dtype=np.float32)

        scene[..., 0:2] = scene[..., 0:2] @ R.T
        scene[..., 2:4] = scene[..., 2:4] @ R.T
        scene[..., 4] = scene[..., 4] - ego_heading

        # === Get history and future ===
        hist = scene[:, :50, :].copy()
        future = torch.tensor(scene[0, 50:, :2].copy(), dtype=torch.float32)

        # === Random horizontal flip (only during training) ===
        if self.augment:
            if np.random.rand() < 0.5:
                hist[..., 0] *= -1
                hist[..., 2] *= -1
                future[:, 0] *= -1

        # === Translate to origin (agent 0 at t=49) ===
        origin = hist[0, 49, :2].copy()
        hist[..., :2] = hist[..., :2] - origin
        future = future - origin

        # === Normalize by scale ===
        hist[..., :4] = hist[..., :4] / self.scale
        future = future / self.scale

        return Data(
            x=torch.tensor(hist, dtype=torch.float32),
            y=future,
            origin=torch.tensor(origin, dtype=torch.float32).unsqueeze(0),
            scale=torch.tensor(self.scale, dtype=torch.float32),
        )


class TrajectoryDatasetTest(Dataset):
    def __init__(self, data, scale=10.0):
        """
        data: Shape (N, 50, 50, 6) for test set with historical trajectory only
        scale: Normalization factor
        """
        self.data = data
        self.scale = scale

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        scene = self.data[idx].copy()

        # === Rotate so agent 0 has heading 0 at t=49 ===
        ego_heading = scene[0, 49, 4]
        cos_theta = np.cos(-ego_heading)
        sin_theta = np.sin(-ego_heading)
        R = np.array([[cos_theta, -sin_theta],
                      [sin_theta,  cos_theta]], dtype=np.float32)

        # Apply rotation (same as training: @ R.T)
        scene[..., 0:2] = scene[..., 0:2] @ R.T
        scene[..., 2:4] = scene[..., 2:4] @ R.T
        scene[..., 4] = scene[..., 4] - ego_heading

        # === Get history (only historical data available for test) ===
        hist = scene.copy()  # (50, 50, 6)

        # === Translate to origin (agent 0 at t=49) - SAME AS TRAINING ===
        origin = hist[0, 49, :2].copy()
        hist[..., :2] = hist[..., :2] - origin

        # === Normalize by scale ===
        hist[..., :4] = hist[..., :4] / self.scale

        return Data(
            x=torch.tensor(hist, dtype=torch.float32),
            origin=torch.tensor(origin, dtype=torch.float32).unsqueeze(0),
            scale=torch.tensor(self.scale, dtype=torch.float32),
            rotation=torch.tensor(R, dtype=torch.float32)
        )

#### Answer Question related to Your Computational Platform and GPU for Question 2 Problem A

In [None]:
torch.manual_seed(251)
np.random.seed(42)

scale = 10.0

N = len(train_data)
val_size = int(0.1 * N)
train_size = N - val_size

#train_data_shuffled = np.random.permutation(train_data)

train_dataset = TrajectoryDatasetTrain(train_data[:train_size], scale=scale, augment=True)
#train_dataset = TrajectoryDatasetTrain(train_data[:train_size], scale=scale, augment=False)
val_dataset = TrajectoryDatasetTrain(train_data[train_size:], scale=scale, augment=False)

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=lambda x: Batch.from_data_list(x))
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=lambda x: Batch.from_data_list(x))

# Set device for training speedup
if torch.backends.mps.is_available():
    device = torch.device('mps')
    print("Using Apple Silicon GPU")
elif torch.cuda.is_available():
    device = torch.device('cuda')
    print("Using CUDA GPU")
else:
    device = torch.device('cpu')

#### Your Model for Question 2 Problem B (Include your model architecture pictures and also can use some mathmatical equations to explain your model in your report)

#### This Model will be covered during Week 6 Lecture (If you don't understand it for now, don't worry, we will cover it in the lecture, or you can ask in the office hours)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ImprovedSocialAttention(nn.Module):
    def __init__(self, ego_dim, other_dim, radius=40.0, heads=8):
        super().__init__()
        self.radius = radius
        self.heads = heads
        self.head_dim = other_dim // heads

        # Multi-head projections
        self.q_proj = nn.Linear(ego_dim, other_dim)
        self.k_proj = nn.Linear(other_dim + 5, other_dim)
        self.v_proj = nn.Linear(other_dim + 5, other_dim)
        self.out = nn.Linear(other_dim, other_dim)

        # Learnable positional encoding for relative positions
        self.pos_encoding = nn.Sequential(
            nn.Linear(5, other_dim // 4),
            nn.ReLU(),
            nn.Linear(other_dim // 4, other_dim)
        )

        self.dropout = nn.Dropout(0.1)

    def forward(self, ego_h, other_h, rel_pos, rel_vel):
        """
        ego_h   : [B, emb]             (query from ego LSTM)
        other_h : [B, A-1, emb]        (other-agent encodings)
        rel_pos : [B, A-1, 3]          (dx, dy, dist)
        rel_vel : [B, A-1, 2]          (dvx, dvy)
        """
        B, N, _ = other_h.shape

        distances = rel_pos[..., 2]
        mask = distances <= self.radius

        if mask.sum() == 0:
            return torch.zeros_like(other_h[:, 0])

        rel_features = torch.cat([rel_pos, rel_vel], dim=-1)

        # Enhanced K, V with positional encoding
        pos_enc = self.pos_encoding(rel_features)
        kv_input = other_h + pos_enc

        q = self.q_proj(ego_h).view(B, 1, self.heads, self.head_dim)
        k = self.k_proj(torch.cat([kv_input, rel_features], dim=-1)).view(B, N, self.heads, self.head_dim)
        v = self.v_proj(torch.cat([kv_input, rel_features], dim=-1)).view(B, N, self.heads, self.head_dim)

        # Scaled dot-product attention with improved masking
        attn = torch.einsum('bqhd,bnhd->bnhq', q, k) / (self.head_dim ** 0.5)

        # Distance-aware attention weighting
        distance_weight = torch.exp(-distances / (self.radius / 3)).unsqueeze(-1).unsqueeze(-1)
        attn = attn * distance_weight

        attn = attn.masked_fill(~mask.unsqueeze(-1).unsqueeze(-1), -1e9)
        attn = F.softmax(attn, dim=1)
        attn = self.dropout(attn)

        ctx = torch.einsum('bnhq,bnhd->bqhd', attn, v).flatten(2)
        return self.out(ctx.squeeze(1))

class ImprovedLSTMParametricTrajectoryFeatures(nn.Module):
    def __init__(self,
                 input_dim=9,
                 emb_dim=128,
                 hidden_dim=256,
                 lstm_layers=2,
                 degree=3,
                 social_heads=8,
                 radius=35.0,
                 other_rnn_dim=256):
        super().__init__()
        self.degree = degree
        self.hidden_dim = hidden_dim

        # Enhanced ego network with bidirectional LSTM
        self.emb = nn.Linear(input_dim, emb_dim)
        self.norm = nn.LayerNorm(emb_dim)
        self.ego_lstm = nn.LSTM(emb_dim, hidden_dim,
                                batch_first=True, num_layers=lstm_layers,
                                bidirectional=True, dropout=0.1)

        # Project bidirectional output back to hidden_dim
        self.ego_proj = nn.Linear(hidden_dim * 2, hidden_dim)

        # Enhanced other-agent encoder
        self.other_emb = nn.Linear(input_dim, other_rnn_dim)
        self.other_norm = nn.LayerNorm(other_rnn_dim)
        self.other_gru = nn.GRU(other_rnn_dim, other_rnn_dim,
                                batch_first=True, num_layers=2, dropout=0.1)

        # Improved social attention
        self.social_attn = ImprovedSocialAttention(
            ego_dim=hidden_dim, other_dim=other_rnn_dim,
            radius=radius, heads=social_heads
        )

        # Enhanced fusion network
        self.fuse = nn.Sequential(
            nn.Linear(hidden_dim + other_rnn_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
        )

        # Separate heads for x and y coefficients
        self.fc_x = nn.Linear(hidden_dim, degree + 1)
        self.fc_y = nn.Linear(hidden_dim, degree + 1)

    def enhanced_featurize(self, raw):
        """Enhanced feature engineering"""
        xp, yp, vx, vy, hd = (raw[..., i] for i in range(5))

        # Enhanced features
        cos_h = torch.cos(hd)
        sin_h = torch.sin(hd)
        speed = torch.sqrt(vx**2 + vy**2 + 1e-6)
        ax = torch.cat([torch.zeros_like(vx[..., :1]), vx[..., 1:] - vx[..., :-1]], dim=-1)
        ay = torch.cat([torch.zeros_like(vy[..., :1]), vy[..., 1:] - vy[..., :-1]], dim=-1)

        # Angular velocity
        hd_diff = hd[..., 1:] - hd[..., :-1]
        hd_diff = torch.where(hd_diff > torch.pi, hd_diff - 2*torch.pi, hd_diff)
        hd_diff = torch.where(hd_diff < -torch.pi, hd_diff + 2*torch.pi, hd_diff)
        angular_vel = torch.cat([torch.zeros_like(hd[..., :1]), hd_diff], dim=-1)

        return torch.stack([xp, yp, vx, vy, cos_h, sin_h, speed, ax, ay], dim=-1)

    def forward(self, data):
        x = data.x.reshape(-1, 50, 50, 6)
        ego_raw = x[:, 0]
        others_raw = x[:, 1:]
        B, A1, T, _ = others_raw.shape

        # Enhanced feature engineering
        ego_feat = self.enhanced_featurize(ego_raw)         
        others_feat = self.enhanced_featurize(
            others_raw.reshape(B*A1, T, 6)
        ).reshape(B, A1, T, -1)                         

        # Encode ego with bidirectional LSTM
        ego_emb = self.norm(self.emb(ego_feat))
        ego_out, (ego_h, _) = self.ego_lstm(ego_emb)

        # Use final hidden state from both directions
        ego_h = self.ego_proj(ego_h[-2:].transpose(0, 1).flatten(1))

        # Enhanced other-agent encoding
        other_emb = self.other_norm(self.other_emb(others_feat.view(B*A1, T, -1)))
        _, other_h = self.other_gru(other_emb)
        other_h = other_h[-1].view(B, A1, -1)

        # Enhanced relative features
        ego_last = ego_feat[:, -1, :2].unsqueeze(1)         
        other_last = others_feat[:, :, -1, :2]              
        rel_pos_vec = other_last - ego_last                 
        dist = torch.norm(rel_pos_vec, dim=-1, keepdim=True)
        rel_pos = torch.cat([rel_pos_vec, dist], dim=-1)   

        # Relative velocity
        ego_vel = ego_feat[:, -1, 2:4].unsqueeze(1)        
        other_vel = others_feat[:, :, -1, 2:4]              
        rel_vel = other_vel - ego_vel                       

        # Social attention with relative velocity
        social_ctx = self.social_attn(ego_h, other_h, rel_pos, rel_vel)

        # Enhanced fusion
        fused = self.fuse(torch.cat([ego_h, social_ctx], dim=-1))

        # Separate prediction heads
        a_coeffs = self.fc_x(fused)  # x coefficients
        b_coeffs = self.fc_y(fused)  # y coefficients

        t = torch.linspace(0, 1, 60, device=x.device)

        basis_funcs = []
        for i in range(self.degree + 1):
            if i == 0:
                basis_funcs.append(torch.ones_like(t))
            elif i == 1:
                basis_funcs.append(t)
            else:
                # Legendre polynomial recurrence
                basis_funcs.append(((2*i-1) * t * basis_funcs[-1] - (i-1) * basis_funcs[-2]) / i)

        t_poly = torch.stack(basis_funcs, -1)              
        t_poly = t_poly.expand(B, -1, -1)               

        # Generate predictions
        x_pred = torch.bmm(t_poly, a_coeffs.unsqueeze(-1)).squeeze(-1)
        y_pred = torch.bmm(t_poly, b_coeffs.unsqueeze(-1)).squeeze(-1)

        # Add residual connection from last position
        last_pos = ego_feat[:, -1, :2]
        x_pred += last_pos[:, 0].unsqueeze(-1)
        y_pred += last_pos[:, 1].unsqueeze(-1)

        return torch.stack([x_pred, y_pred], dim=-1)


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class EgoOnlyLSTMParametricTrajectoryFeatures(nn.Module):
    def __init__(self,
                 input_dim=9,
                 emb_dim=128,
                 hidden_dim=256,
                 lstm_layers=2,
                 degree=3):
        super().__init__()
        self.degree = degree
        self.hidden_dim = hidden_dim

        # Embedding and normalization
        self.emb = nn.Linear(input_dim, emb_dim)
        self.norm = nn.LayerNorm(emb_dim)

        # Bidirectional LSTM for ego features
        self.ego_lstm = nn.LSTM(
            emb_dim, hidden_dim,
            batch_first=True, num_layers=lstm_layers,
            bidirectional=True, dropout=0.1
        )

        # Project bi-LSTM output
        self.ego_proj = nn.Linear(hidden_dim * 2, hidden_dim)

        # Deep feedforward network (analogous to fusion)
        self.deep_block = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
        )

        # Separate heads for x and y coefficients
        self.fc_x = nn.Linear(hidden_dim, degree + 1)
        self.fc_y = nn.Linear(hidden_dim, degree + 1)

    def enhanced_featurize(self, raw):
        xp, yp, vx, vy, hd = (raw[..., i] for i in range(5))

        cos_h = torch.cos(hd)
        sin_h = torch.sin(hd)
        speed = torch.sqrt(vx**2 + vy**2 + 1e-6)

        ax = torch.cat([torch.zeros_like(vx[..., :1]), vx[..., 1:] - vx[..., :-1]], dim=-1)
        ay = torch.cat([torch.zeros_like(vy[..., :1]), vy[..., 1:] - vy[..., :-1]], dim=-1)

        hd_diff = hd[..., 1:] - hd[..., :-1]
        hd_diff = torch.where(hd_diff > torch.pi, hd_diff - 2*torch.pi, hd_diff)
        hd_diff = torch.where(hd_diff < -torch.pi, hd_diff + 2*torch.pi, hd_diff)
        angular_vel = torch.cat([torch.zeros_like(hd[..., :1]), hd_diff], dim=-1)

        return torch.stack([xp, yp, vx, vy, cos_h, sin_h, speed, ax, ay], dim=-1)

    def forward(self, data):
        x = data.x.reshape(-1, 50, 50, 6)
        ego_raw = x[:, 0]

        # Feature engineering
        ego_feat = self.enhanced_featurize(ego_raw)

        # Embedding and normalization
        ego_emb = self.norm(self.emb(ego_feat))

        # Encode with bi-LSTM
        ego_out, (ego_h, _) = self.ego_lstm(ego_emb)

        # Concatenate final hidden states
        ego_h = self.ego_proj(ego_h[-2:].transpose(0, 1).flatten(1))

        # Deep block
        deep_out = self.deep_block(ego_h)

        # Coefficient prediction
        a_coeffs = self.fc_x(deep_out)
        b_coeffs = self.fc_y(deep_out)

        # Generate polynomial basis (Legendre)
        t = torch.linspace(0, 1, 60, device=x.device)
        basis_funcs = []
        for i in range(self.degree + 1):
            if i == 0:
                basis_funcs.append(torch.ones_like(t))
            elif i == 1:
                basis_funcs.append(t)
            else:
                basis_funcs.append(((2*i-1)*t*basis_funcs[-1] - (i-1)*basis_funcs[-2])/i)
        t_poly = torch.stack(basis_funcs, -1).expand(ego_h.shape[0], -1, -1)

        # Generate predictions
        x_pred = torch.bmm(t_poly, a_coeffs.unsqueeze(-1)).squeeze(-1)
        y_pred = torch.bmm(t_poly, b_coeffs.unsqueeze(-1)).squeeze(-1)

        return torch.stack([x_pred, y_pred], dim=-1)


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ImprovedSocialAttention(nn.Module):
    def __init__(self, ego_dim, other_dim, radius=40.0, heads=8):
        super().__init__()
        self.radius = radius
        self.heads = heads
        self.head_dim = other_dim // heads

        # Multi-head projections
        self.q_proj = nn.Linear(ego_dim, other_dim)
        self.k_proj = nn.Linear(other_dim + 5, other_dim)
        self.v_proj = nn.Linear(other_dim + 5, other_dim)
        self.out = nn.Linear(other_dim, other_dim)

        # Learnable positional encoding for relative positions
        self.pos_encoding = nn.Sequential(
            nn.Linear(5, other_dim // 4),
            nn.ReLU(),
            nn.Linear(other_dim // 4, other_dim)
        )

        self.dropout = nn.Dropout(0.1)

    def forward(self, ego_h, other_h, rel_pos, rel_vel):
        """
        ego_h   : [B, emb]             (query from ego LSTM)
        other_h : [B, A-1, emb]        (other-agent encodings)
        rel_pos : [B, A-1, 3]          (dx, dy, dist)
        rel_vel : [B, A-1, 2]          (dvx, dvy)
        """
        B, N, _ = other_h.shape

        distances = rel_pos[..., 2]
        mask = distances <= self.radius

        if mask.sum() == 0:
            return torch.zeros_like(other_h[:, 0])

        # Combine relative position and velocity
        rel_features = torch.cat([rel_pos, rel_vel], dim=-1)

        # Enhanced K, V with positional encoding
        pos_enc = self.pos_encoding(rel_features)
        kv_input = other_h + pos_enc

        q = self.q_proj(ego_h).view(B, 1, self.heads, self.head_dim)
        k = self.k_proj(torch.cat([kv_input, rel_features], dim=-1)).view(B, N, self.heads, self.head_dim)
        v = self.v_proj(torch.cat([kv_input, rel_features], dim=-1)).view(B, N, self.heads, self.head_dim)

        # Scaled dot-product attention with improved masking
        attn = torch.einsum('bqhd,bnhd->bnhq', q, k) / (self.head_dim ** 0.5)

        # Distance-aware attention weighting
        distance_weight = torch.exp(-distances / (self.radius / 3)).unsqueeze(-1).unsqueeze(-1)
        attn = attn * distance_weight

        attn = attn.masked_fill(~mask.unsqueeze(-1).unsqueeze(-1), -1e9)
        attn = F.softmax(attn, dim=1)
        attn = self.dropout(attn)

        ctx = torch.einsum('bnhq,bnhd->bqhd', attn, v).flatten(2)
        return self.out(ctx.squeeze(1))

class NoPolynomialLSTMParametricTrajectoryFeatures(nn.Module):
    def __init__(self,
                 input_dim=9,
                 emb_dim=128,
                 hidden_dim=256,
                 lstm_layers=2,
                 degree=3,
                 social_heads=8,
                 radius=35.0,
                 other_rnn_dim=256):
        super().__init__()
        self.degree = degree
        self.hidden_dim = hidden_dim

        # Bidirectional LSTM
        self.emb = nn.Linear(input_dim, emb_dim)
        self.norm = nn.LayerNorm(emb_dim)
        self.ego_lstm = nn.LSTM(emb_dim, hidden_dim,
                                batch_first=True, num_layers=lstm_layers,
                                bidirectional=True, dropout=0.1)

        # Project bidirectional output back to hidden_dim
        self.ego_proj = nn.Linear(hidden_dim * 2, hidden_dim)

        # Enhanced other-agent encoder
        self.other_emb = nn.Linear(input_dim, other_rnn_dim)
        self.other_norm = nn.LayerNorm(other_rnn_dim)
        self.other_gru = nn.GRU(other_rnn_dim, other_rnn_dim,
                                batch_first=True, num_layers=2, dropout=0.1)

        # Improved social attention
        self.social_attn = ImprovedSocialAttention(
            ego_dim=hidden_dim, other_dim=other_rnn_dim,
            radius=radius, heads=social_heads
        )

        # Enhanced fusion network
        self.fuse = nn.Sequential(
            nn.Linear(hidden_dim + other_rnn_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
        )
        # Direct output layer for x and y coordinates
        self.output_fc = nn.Linear(hidden_dim, 60*2)

    def enhanced_featurize(self, raw):
        """Enhanced feature engineering"""
        xp, yp, vx, vy, hd = (raw[..., i] for i in range(5))

        # Enhanced features
        cos_h = torch.cos(hd)
        sin_h = torch.sin(hd)
        speed = torch.sqrt(vx**2 + vy**2 + 1e-6)
        ax = torch.cat([torch.zeros_like(vx[..., :1]), vx[..., 1:] - vx[..., :-1]], dim=-1)
        ay = torch.cat([torch.zeros_like(vy[..., :1]), vy[..., 1:] - vy[..., :-1]], dim=-1)

        # Angular velocity
        hd_diff = hd[..., 1:] - hd[..., :-1]
        hd_diff = torch.where(hd_diff > torch.pi, hd_diff - 2*torch.pi, hd_diff)
        hd_diff = torch.where(hd_diff < -torch.pi, hd_diff + 2*torch.pi, hd_diff)
        angular_vel = torch.cat([torch.zeros_like(hd[..., :1]), hd_diff], dim=-1)

        return torch.stack([xp, yp, vx, vy, cos_h, sin_h, speed, ax, ay], dim=-1)

    def forward(self, data):
        x = data.x.reshape(-1, 50, 50, 6)
        ego_raw = x[:, 0]                    
        others_raw = x[:, 1:]               
        B, A1, T, _ = others_raw.shape

        ego_feat = self.enhanced_featurize(ego_raw)         
        others_feat = self.enhanced_featurize(
            others_raw.reshape(B*A1, T, 6)
        ).reshape(B, A1, T, -1)                           

        # Bidirectional LSTM
        ego_emb = self.norm(self.emb(ego_feat))
        ego_out, (ego_h, _) = self.ego_lstm(ego_emb)

        # Use final hidden state from both directions
        ego_h = self.ego_proj(ego_h[-2:].transpose(0, 1).flatten(1))

        # Enhanced other-agent encoding
        other_emb = self.other_norm(self.other_emb(others_feat.view(B*A1, T, -1)))
        _, other_h = self.other_gru(other_emb)
        other_h = other_h[-1].view(B, A1, -1)             

        # Enhanced relative features
        ego_last = ego_feat[:, -1, :2].unsqueeze(1)        
        other_last = others_feat[:, :, -1, :2]          
        rel_pos_vec = other_last - ego_last               
        dist = torch.norm(rel_pos_vec, dim=-1, keepdim=True)
        rel_pos = torch.cat([rel_pos_vec, dist], dim=-1)

        # Relative velocity
        ego_vel = ego_feat[:, -1, 2:4].unsqueeze(1)
        other_vel = others_feat[:, :, -1, 2:4]
        rel_vel = other_vel - ego_vel

        # Social attention with relative velocity
        social_ctx = self.social_attn(ego_h, other_h, rel_pos, rel_vel)

        # Enhanced fusion
        fused = self.fuse(torch.cat([ego_h, social_ctx], dim=-1))

        # Direct output layer
        output = self.output_fc(fused)
        return output.view(-1, 60, 2)


In [None]:
class MLP(nn.Module):
    def __init__(self, input_features, output_features):
        super(MLP, self).__init__()

        # Define the layers
        self.flatten = nn.Flatten()
        self.mlp = nn.Sequential(
            nn.Linear(input_features, 1024),
            nn.ReLU(),
            nn.Dropout(0.1),

            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(0.1),

            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.1),

            nn.Linear(256, output_features)
        )

    def forward(self, data):
        x = data.x
        x = x.reshape(-1, 50, 50, 6)
        x = x[:, 0, :, :]
        x = self.flatten(x)
        x = self.mlp(x)
        return x.view(-1, 60, 2)

In [None]:
class LSTM(nn.Module):
    def __init__(self, input_dim=6, hidden_dim=256, output_dim=60 * 2):
        super(LSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True, num_layers=2)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, data):
        x = data.x
        x= x.reshape(-1, 50, 50, 6)
        x = x[:, 0, :, :] # Only Consider ego agent index 0

        lstm_out, _ = self.lstm(x)
        out = self.fc(lstm_out[:, -1, :])
        return out.view(-1, 60, 2)

In [None]:
class Transformer(nn.Module):
    def __init__(self, input_dim=6, emb_dim=128, nhead=8, num_layers=6, output_dim=60 * 2):
        super().__init__()
        self.input_fc = nn.Linear(input_dim, emb_dim)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=emb_dim,
            nhead=nhead,
            dim_feedforward=256,
            batch_first=True
        )
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.output_fc = nn.Linear(emb_dim, output_dim)

    def forward(self, data):
        x = data.x
        x = x.reshape(-1, 50, 50, 6)
        x = x[:, 0, :, :]

        emb = self.input_fc(x)
        enc_out = self.encoder(emb)
        final_hidden = enc_out[:, -1, :]

        out = self.output_fc(final_hidden)
        return out.view(-1, 60, 2)


#### Your Optimizer and Hyperparameters for Question 2 Problem A (Try to use different optimizers and hyperparameters for your model and see how it affects the performance of your model)

In [None]:
class TrajectoryLoss(nn.Module):
    def __init__(self, loss_type='weighted', weight_scheme='linear', **kwargs):
        super().__init__()
        self.loss_type = loss_type
        self.weight_scheme = weight_scheme
        self.kwargs = kwargs

    def forward(self, pred, target):
        if self.loss_type == 'standard':
            return nn.MSELoss()(pred, target)

        elif self.loss_type == 'weighted':
            if self.weight_scheme == 'linear':
                start_weight = self.kwargs.get('start_weight', 1.0)
                end_weight = self.kwargs.get('end_weight', 2.0)
                weights = torch.linspace(start_weight, end_weight, 60, device=pred.device)

            elif self.weight_scheme == 'quadratic':
                start_weight = self.kwargs.get('start_weight', 1.0)
                end_weight = self.kwargs.get('end_weight', 2.0)
                t = torch.linspace(0, 1, 60, device=pred.device)
                weights = start_weight + (end_weight - start_weight) * t ** 2

            elif self.weight_scheme == 'exponential':
                start_weight = self.kwargs.get('start_weight', 1.0)
                end_weight = self.kwargs.get('end_weight', 2.0)
                t = torch.linspace(0, 1, 60, device=pred.device)
                weights = start_weight * (end_weight / start_weight) ** t

            weights = weights.view(1, -1, 1)
            return ((pred - target) ** 2 * weights).mean()

In [None]:
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR


model = ImprovedLSTMParametricTrajectoryFeatures(degree=3, lstm_layers=2, other_rnn_dim=256).to(device)





optimizer = AdamW(model.parameters(), lr=2e-4, weight_decay=1e-4)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)

#scheduler = CosineAnnealingLR(optimizer, T_max=80, eta_min=1e-6)
#scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.25) # You can try different schedulers
early_stopping_patience = 12
best_val_loss = float('inf')
no_improvement = 0
#criterion = nn.MSELoss()
criterion = TrajectoryLoss()

#### Using the Simple Linear Regression Model for Question 2B and Visualize the validation loss(MAE) (Hint: You should adapt the code for training loss and try to draw graphs as specified in the project description)

In [None]:
# Initialize model tracking variables
best_models = {
    'first': {'loss': float('inf'), 'path': 'best_model_1st.pt'},
    'second': {'loss': float('inf'), 'path': 'best_model_2nd.pt'},
    'third': {'loss': float('inf'), 'path': 'best_model_3rd.pt'}
}

def update_best_models(current_loss, model_state_dict):
    """
    Update the top 3 models if current model beats the best model.
    Only stores a new model if it beats the current best (first place).
    """
    if current_loss < best_models['first']['loss'] - 1e-4:
        old_first_state = None
        if best_models['first']['loss'] != float('inf'):
            old_first_state = torch.load(best_models['first']['path'])

        old_second_state = None
        if best_models['second']['loss'] != float('inf'):
            old_second_state = torch.load(best_models['second']['path'])

        if old_second_state is not None:
            torch.save(old_second_state, best_models['third']['path'])
            best_models['third']['loss'] = best_models['second']['loss']

        if old_first_state is not None:
            torch.save(old_first_state, best_models['second']['path'])
            best_models['second']['loss'] = best_models['first']['loss']

        torch.save(model_state_dict, best_models['first']['path'])
        best_models['first']['loss'] = current_loss

        return True

    return False


# Training loop
no_improvement = 0
early_stopping_patience = 12

for epoch in tqdm.tqdm(range(110), desc="Epoch", unit="epoch"):
    # ---- Training ----
    model.train()
    train_loss = 0
    for batch in train_dataloader:
        batch = batch.to(device)
        pred = model(batch)
        y = batch.y.view(batch.num_graphs, 60, 2)
        loss = criterion(pred, y)
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 5.0)
        optimizer.step()
        train_loss += loss.item()

    # ---- Validation ----
    model.eval()
    val_loss = 0
    val_mae = 0
    val_mse = 0
    with torch.no_grad():
        for batch in val_dataloader:
            batch = batch.to(device)
            pred = model(batch)
            y = batch.y.view(batch.num_graphs, 60, 2)
            val_loss += criterion(pred, y).item()

            pred = pred * batch.scale.view(-1, 1, 1) + batch.origin.unsqueeze(1)
            y = y * batch.scale.view(-1, 1, 1) + batch.origin.unsqueeze(1)
            val_mae += nn.L1Loss()(pred, y).item()
            val_mse += nn.MSELoss()(pred, y).item()

    train_loss /= len(train_dataloader)
    val_loss /= len(val_dataloader)
    val_mae /= len(val_dataloader)
    val_mse /= len(val_dataloader)
    scheduler.step(val_loss)

    tqdm.tqdm.write(f"Epoch {epoch:03d} | Learning rate {optimizer.param_groups[0]['lr']:.6f} | train normalized MSE {train_loss:8.4f} | val normalized MSE {val_loss:8.4f}, | val MAE {val_mae:8.4f} | val MSE {val_mse:8.4f}")

    # Update best models - only stores if it beats the current best
    model_stored = update_best_models(val_loss, model.state_dict())

    if model_stored:
        no_improvement = 0
        tqdm.tqdm.write(f"New best model saved! Current ranking losses: 1st: {best_models['first']['loss']:.6f}, 2nd: {best_models['second']['loss']:.6f}, 3rd: {best_models['third']['loss']:.6f}")
    else:
        no_improvement += 1
        if no_improvement >= early_stopping_patience:
            print("Early stop!")
            break

print("\nFinal model rankings:")
print(f"1st place (best): {best_models['first']['loss']:.6f} - {best_models['first']['path']}")
print(f"2nd place: {best_models['second']['loss']:.6f} - {best_models['second']['path']}")
print(f"3rd place: {best_models['third']['loss']:.6f} - {best_models['third']['path']}")

#### Randomly sample validation dataset and Visualize the ground truth and your predictions on a 2D plane for Question 3 Problem A

In [None]:
import matplotlib.pyplot as plt
import random

def plot_trajectory(ax, pred, gt, title=None):
    ax.cla()
    # Plot the predicted future trajectory
    ax.plot(pred[0,:60,0], pred[0,:60,1], color='red', label='Predicted Future Trajectory')

    # Plot the ground truth future trajectory
    ax.plot(gt[0,:60,0], gt[0,:60,1], color='blue', label='Ground Truth Future Trajectory')

    # Optionally set axis limits, labels, and title.
    x_max = max(pred[..., 0].max(), gt[..., 0].max())
    x_min = min(pred[..., 0].min(), gt[..., 0].min())
    y_max = max(pred[..., 1].max(), gt[..., 1].max())
    y_min = min(pred[..., 1].min(), gt[..., 1].min())

    ax.set_xlim(x_min, x_max)
    ax.set_ylim(y_min, y_max)
    ax.set_xlabel('X-axis')
    ax.set_ylabel('Y-axis')

    if title:
        ax.set_title(title)

    ax.legend()
    ax.grid(True, linestyle='--', alpha=0.7)

In [None]:
import matplotlib.pyplot as plt
import random
from google.colab import files

# Load model
model.load_state_dict(torch.load(best_models['first']['path']))
model.eval()

#idx = random.choice(range(len(val_dataset)))
idx = 276
batch = val_dataset[idx]
batch = batch.to(device)

pred = model(batch)
gt = torch.stack(torch.split(batch.y, 60, dim=0), dim=0)
pred = pred * batch.scale.view(-1, 1, 1) + batch.origin.unsqueeze(1)
gt = gt * batch.scale.view(-1, 1, 1) + batch.origin.unsqueeze(1)
pred = pred.detach().cpu().numpy()
gt = gt.detach().cpu().numpy()

# Plot
fig, ax = plt.subplots(figsize=(10, 5))
plot_trajectory(ax, pred, gt, title=f"Sample {idx}, Proposed Model")
plt.tight_layout()
save_path = "trajectory_plot_imp.png"
plt.savefig(save_path)
plt.show()

# Download
files.download(save_path)


In [None]:
plt.figure(figsize=(8, 6))
from google.colab import files

plt.plot(range(1, len(zero_heading_val) + 1), zero_heading_val, label='Degree 3')
plt.plot(range(1, len(degree_5_val) + 1), degree_5_val, label='Degree 5')
plt.plot(range(1, len(degree_7_val) + 1), degree_7_val, label='Degree 7')

plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Validation Loss')
plt.legend()
plt.grid(True)
plt.savefig('validation_loss_degree.png')
plt.show()

files.download('validation_loss_degree.png')


In [None]:
plt.figure(figsize=(8, 6))
from google.colab import files

plt.plot(range(1, len(zero_heading_train) + 1), zero_heading_train, label='Degree 3')
plt.plot(range(1, len(degree_5_train) + 1), degree_5_train, label='Degree 5')
plt.plot(range(1, len(degree_7_train) + 1), degree_7_train, label='Degree 7')

plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.legend()
plt.grid(True)
plt.savefig('training_loss_degree.png')
plt.show()

files.download('training_loss_degree.png')

In [None]:
import matplotlib.pyplot as plt
import random

def plot_trajectory(ax, pred, gt, title=None):
    ax.cla()
    # Plot the predicted future trajectory
    ax.plot(pred[0,:60,0], pred[0,:60,1], color='red', label='Predicted Future Trajectory')

    # Plot the ground truth future trajectory
    ax.plot(gt[0,:60,0], gt[0,:60,1], color='blue', label='Ground Truth Future Trajectory')

    # Optionally set axis limits, labels, and title.
    x_max = max(pred[..., 0].max(), gt[..., 0].max())
    x_min = min(pred[..., 0].min(), gt[..., 0].min())
    y_max = max(pred[..., 1].max(), gt[..., 1].max())
    y_min = min(pred[..., 1].min(), gt[..., 1].min())

    ax.set_xlim(x_min, x_max)
    ax.set_ylim(y_min, y_max)
    ax.set_xlabel('X-axis')
    ax.set_ylabel('Y-axis')

    if title:
        ax.set_title(title)

    ax.legend()
    ax.grid(True, linestyle='--', alpha=0.7)

In [None]:
model.load_state_dict(torch.load(best_models['first']['path']))

In [None]:
#model.load_state_dict(torch.load("best_model.pt"))
model.eval()

# randomly select 4 samples from the validation set
random_indices = random.sample(range(len(val_dataset)), 4)
fig, axes = plt.subplots(2, 2, figsize=(20, 10))
axes = axes.flatten()  # Flatten the array to iterate single axes objects

for i, idx in enumerate(random_indices):
    batch = val_dataset[idx]
    batch = batch.to(device)
    pred = model(batch)
    gt = torch.stack(torch.split(batch.y, 60, dim=0), dim=0)

    pred = pred * batch.scale.view(-1, 1, 1) + batch.origin.unsqueeze(1)
    gt = torch.stack(torch.split(batch.y, 60, dim=0), dim=0) * batch.scale.view(-1, 1, 1) + batch.origin.unsqueeze(1)

    pred = pred.detach().cpu().numpy()
    gt = gt.detach().cpu().numpy()

    # Plot the trajectory using the i-th axis
    plot_trajectory(axes[i], pred, gt, title=f"Sample {idx}")

plt.show()

#### Output your predictions of the best model on the test set

In [None]:
test_dataset = TrajectoryDatasetTest(test_data, scale=scale)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False,
                         collate_fn=lambda xs: Batch.from_data_list(xs))

#best_model = torch.load("best_model.pt")
#model = LinearRegressionModel().to(device)
# model = MLP(50 * 50 * 6, 60 * 2).to(device)
# model = LSTM().to(device)
#model = Agent0HybridTransformer(emb_dim=256,num_layers=6).to(device)
#model = Agent0TemporalAttentionTransformer(emb_dim=256, num_layers=6).to(device)
#model = Agent0TemporalAttentionTransformer(emb_dim=256, num_layers=8).to(device)

# Old best (second best submission):
#model = ImprovedLSTMParametricTrajectoryFeatures(degree=3, lstm_layers=2).to(device)

# Best so far:
#model = ImprovedLSTMParametricTrajectoryFeatures(degree=3, lstm_layers=2, other_rnn_dim=256).to(device)


model = ImprovedLSTMParametricTrajectoryFeatures(degree=3, lstm_layers=2, other_rnn_dim=256).to(device)


model.load_state_dict(torch.load(best_models['first']['path']))
#model.load_state_dict(best_model)
model.eval()

pred_list = []
with torch.no_grad():
    for batch in test_loader:
        batch = batch.to(device)
        pred_norm = model(batch)

        data_list = batch.to_data_list()

        batch_predictions = []
        for i, data in enumerate(data_list):
            # Get the prediction for this sample
            single_pred_norm = pred_norm[i]

            # Unnormalize (scale back up)
            single_pred = single_pred_norm * data.scale

            # Translate back from origin (add back the translation)
            single_pred = single_pred + data.origin

            # Rotate back to world coordinates
            R = data.rotation
            single_pred = single_pred @ R

            batch_predictions.append(single_pred.cpu().numpy())

        # Stack predictions for this batch
        batch_pred_array = np.stack(batch_predictions, axis=0)
        pred_list.append(batch_pred_array)

# Concatenate all predictions
pred_list = np.concatenate(pred_list, axis=0)

# Reshape for submission format
pred_output = pred_list.reshape(-1, 2)

# Create submission DataFrame
output_df = pd.DataFrame(pred_output, columns=['x', 'y'])
output_df.index.name = 'index'
output_df.to_csv('submission.csv', index=True)

In [None]:
print(output_df.head(12))

In [None]:
from google.colab import files
files.download('submission.csv')


## Step 4: Summarize your experiments and results in table and figures in the submitted PDF file for Question 3 Problem A

## Step 5: Analyze the results, identify the issues and plan for the improvement in the submitted PDF file for Question 3 Problem B