#360 degree video content based viewport prediction
LSTM-ekf hybrd


## 1. Setup & Dependencies

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from pathlib import Path
from typing import Tuple, List, Optional, Dict
import warnings
warnings.filterwarnings('ignore')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)

## 2. Configuration

In [None]:
class Config:
    DATA_ROOT = Path("/kaggle/input/d-sav360")
    HEAD_DATA_DIR = DATA_ROOT / "head"
    DEV_MODE = True
    DEV_VIDEO_ID = "0001"
    PREDICTION_HORIZON_SEC = 2.5
    INPUT_HISTORY_SEC = 2.0
    SAMPLE_RATE_HZ = 90
    PREDICTION_STEPS = int(PREDICTION_HORIZON_SEC * SAMPLE_RATE_HZ)
    INPUT_STEPS = int(INPUT_HISTORY_SEC * SAMPLE_RATE_HZ)
    LSTM_HIDDEN_SIZE = 128
    LSTM_NUM_LAYERS = 2
    LSTM_DROPOUT = 0.2
    EKF_PROCESS_NOISE = 0.01
    EKF_MEASUREMENT_NOISE = 0.001
    BATCH_SIZE = 64
    LEARNING_RATE = 1e-3
    NUM_EPOCHS = 50
    EARLY_STOPPING_PATIENCE = 10
    EVAL_HORIZONS = [0.5, 1.0, 1.5, 2.0, 2.5]
    TRAIN_RATIO = 0.7
    VAL_RATIO = 0.15

config = Config()
print(f"Prediction: {config.INPUT_STEPS} steps -> {config.PREDICTION_STEPS} steps")

## 3. Spherical Geometry Utilities

In [None]:
class SphericalUtils:
    @staticmethod
    def uv_to_unit_vector(u, v):
        theta = u * 2 * np.pi
        phi = (v - 0.5) * np.pi
        x = np.cos(phi) * np.cos(theta)
        y = np.cos(phi) * np.sin(theta)
        z = np.sin(phi)
        return np.stack([x, y, z], axis=-1)
    
    @staticmethod
    def unit_vector_to_uv(p):
        x, y, z = p[..., 0], p[..., 1], p[..., 2]
        theta = np.arctan2(y, x)
        theta = np.where(theta < 0, theta + 2 * np.pi, theta)
        phi = np.arcsin(np.clip(z, -1, 1))
        return theta / (2 * np.pi), phi / np.pi + 0.5
    
    @staticmethod
    def tangent_velocity(p_t, p_next):
        dot = np.sum(p_t * p_next, axis=-1, keepdims=True)
        dot = np.clip(dot, -1.0, 1.0)
        tangent = p_next - dot * p_t
        tangent_norm = np.linalg.norm(tangent, axis=-1, keepdims=True) + 1e-8
        angle = np.arccos(dot)
        return (tangent / tangent_norm) * angle
    
    @staticmethod
    def exp_map(p, v):
        v_norm = np.linalg.norm(v, axis=-1, keepdims=True) + 1e-8
        result = np.cos(v_norm) * p + np.sin(v_norm) * (v / v_norm)
        return result / (np.linalg.norm(result, axis=-1, keepdims=True) + 1e-8)
    
    @staticmethod
    def normalize(p):
        return p / (np.linalg.norm(p, axis=-1, keepdims=True) + 1e-8)

In [None]:
class SphericalUtilsTorch:
    @staticmethod
    def normalize(p):
        return p / (torch.norm(p, dim=-1, keepdim=True) + 1e-8)
    
    @staticmethod
    def exp_map(p, v):
        v_norm = torch.norm(v, dim=-1, keepdim=True) + 1e-8
        result = torch.cos(v_norm) * p + torch.sin(v_norm) * (v / v_norm)
        return SphericalUtilsTorch.normalize(result)
    
    @staticmethod
    def cosine_loss(p_pred, p_target):
        return 1.0 - torch.sum(p_pred * p_target, dim=-1)
    
    @staticmethod
    def angular_error_degrees(p_pred, p_target):
        dot = torch.clamp(torch.sum(p_pred * p_target, dim=-1), -1.0, 1.0)
        return torch.acos(dot) * (180.0 / np.pi)

## 4. Data Loading

In [None]:
def load_head_tracking_data(video_id, data_dir):
    file_path = data_dir / f"head_video_{video_id}.csv"
    df = pd.read_csv(file_path)
    unit_vecs = SphericalUtils.uv_to_unit_vector(df['u'].values, df['v'].values)
    df['px'], df['py'], df['pz'] = unit_vecs[:, 0], unit_vecs[:, 1], unit_vecs[:, 2]
    return df

def split_by_participant(participant_ids, train_ratio=0.7, val_ratio=0.15, seed=42):
    np.random.seed(seed)
    ids = np.array(participant_ids)
    np.random.shuffle(ids)
    n = len(ids)
    n_train, n_val = int(n * train_ratio), int(n * val_ratio)
    return ids[:n_train].tolist(), ids[n_train:n_train+n_val].tolist(), ids[n_train+n_val:].tolist()

## 5. Dataset

In [None]:
class ViewportDataset(Dataset):
    def __init__(self, df, participant_ids, input_steps, prediction_steps, eval_horizons_steps):
        self.input_steps = input_steps
        self.prediction_steps = prediction_steps
        self.eval_horizons_steps = eval_horizons_steps
        self.sequences = []
        
        df_filtered = df[df['id'].isin(participant_ids)].copy()
        for pid in participant_ids:
            pdf = df_filtered[df_filtered['id'] == pid].sort_values('t')
            if len(pdf) < input_steps + prediction_steps:
                continue
            positions = pdf[['px', 'py', 'pz']].values
            velocities = np.zeros_like(positions)
            velocities[:-1] = SphericalUtils.tangent_velocity(positions[:-1], positions[1:])
            velocities[-1] = velocities[-2]
            
            total_len = input_steps + prediction_steps
            for i in range(len(positions) - total_len + 1):
                self.sequences.append({
                    'positions': positions[i:i+total_len],
                    'velocities': velocities[i:i+total_len],
                    'pid': pid
                })
        print(f"Created {len(self.sequences)} sequences")
    
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        seq = self.sequences[idx]
        targets = [seq['positions'][self.input_steps - 1 + h] for h in self.eval_horizons_steps]
        return {
            'input_positions': torch.FloatTensor(seq['positions'][:self.input_steps]),
            'input_velocities': torch.FloatTensor(seq['velocities'][:self.input_steps]),
            'targets': torch.FloatTensor(np.array(targets))
        }

## 6. Extended Kalman Filter (Frozen)

In [None]:
class SphericalEKF:
    def __init__(self, process_noise=0.01, measurement_noise=0.001, dt=1.0/90):
        self.dt = dt
        self.Q = np.eye(6) * process_noise
        self.Q[:3, :3] *= 0.1
        self.R = np.eye(3) * measurement_noise
        self.F = np.eye(6)
        self.F[:3, 3:] = np.eye(3) * dt
        self.H = np.zeros((3, 6))
        self.H[:3, :3] = np.eye(3)
        self.reset()
    
    def reset(self):
        self.x = np.zeros(6)
        self.x[2] = 1.0
        self.P = np.eye(6) * 0.1
        self.initialized = False
    
    def predict_trajectory(self, steps):
        trajectory = np.zeros((steps, 3))
        x_pred = self.x.copy()
        for i in range(steps):
            x_pred = self.F @ x_pred
            x_pred[:3] = SphericalUtils.normalize(x_pred[:3])
            trajectory[i] = x_pred[:3]
        return trajectory
    
    def update(self, measurement):
        if not self.initialized:
            self.x[:3] = measurement
            self.initialized = True
            return measurement, 0.0
        
        x_pred = self.F @ self.x
        x_pred[:3] = SphericalUtils.normalize(x_pred[:3])
        P_pred = self.F @ self.P @ self.F.T + self.Q
        y = measurement - self.H @ x_pred
        innovation_mag = np.linalg.norm(y)
        S = self.H @ P_pred @ self.H.T + self.R
        K = P_pred @ self.H.T @ np.linalg.inv(S)
        self.x = x_pred + K @ y
        self.x[:3] = SphericalUtils.normalize(self.x[:3])
        self.P = (np.eye(6) - K @ self.H) @ P_pred
        return self.x[:3], innovation_mag

In [None]:
class BatchEKF:
    def __init__(self, config):
        self.ekf = SphericalEKF(config.EKF_PROCESS_NOISE, config.EKF_MEASUREMENT_NOISE)
    
    def process_batch(self, input_positions, eval_horizons_steps):
        batch_size, seq_len = input_positions.shape[:2]
        positions_np = input_positions.cpu().numpy()
        ekf_preds = np.zeros((batch_size, len(eval_horizons_steps), 3))
        innovations = np.zeros((batch_size, seq_len))
        
        for b in range(batch_size):
            self.ekf.reset()
            for t in range(seq_len):
                _, innov = self.ekf.update(positions_np[b, t])
                innovations[b, t] = innov
            traj = self.ekf.predict_trajectory(max(eval_horizons_steps))
            for i, h in enumerate(eval_horizons_steps):
                ekf_preds[b, i] = traj[h - 1]
        
        return torch.FloatTensor(ekf_preds).to(input_positions.device), torch.FloatTensor(innovations).to(input_positions.device)

## 7. LSTM Model with Gating

In [None]:
class SphericalLSTM(nn.Module):
    def __init__(self, input_dim=6, hidden_size=128, num_layers=2, dropout=0.2, num_horizons=5):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_size, num_layers, batch_first=True, 
                           dropout=dropout if num_layers > 1 else 0)
        self.prediction_heads = nn.ModuleList([
            nn.Sequential(nn.Linear(hidden_size, hidden_size//2), nn.ReLU(), 
                         nn.Dropout(dropout), nn.Linear(hidden_size//2, 3))
            for _ in range(num_horizons)
        ])
        self.gate_head = nn.Sequential(
            nn.Linear(hidden_size + 1, hidden_size//4), nn.ReLU(),
            nn.Linear(hidden_size//4, num_horizons), nn.Sigmoid()
        )
    
    def forward(self, input_positions, input_velocities, innovation_magnitude):
        lstm_input = torch.cat([input_positions, input_velocities], dim=-1)
        lstm_out, _ = self.lstm(lstm_input)
        hidden = lstm_out[:, -1, :]
        corrections = torch.stack([head(hidden) for head in self.prediction_heads], dim=1)
        gate_input = torch.cat([hidden, innovation_magnitude.mean(dim=-1, keepdim=True)], dim=-1)
        gates = self.gate_head(gate_input)
        return corrections, gates

In [None]:
class KalmanLSTMHybrid(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.eval_horizons_steps = [int(h * config.SAMPLE_RATE_HZ) for h in config.EVAL_HORIZONS]
        self.batch_ekf = BatchEKF(config)
        self.lstm = SphericalLSTM(6, config.LSTM_HIDDEN_SIZE, config.LSTM_NUM_LAYERS,
                                  config.LSTM_DROPOUT, len(config.EVAL_HORIZONS))
    
    def forward(self, input_positions, input_velocities):
        ekf_preds, innovations = self.batch_ekf.process_batch(input_positions, self.eval_horizons_steps)
        corrections, gates = self.lstm(input_positions, input_velocities, innovations)
        gated_corrections = gates.unsqueeze(-1) * corrections
        predictions = SphericalUtilsTorch.exp_map(ekf_preds, gated_corrections)
        return predictions, {'ekf_predictions': ekf_preds, 'gates': gates, 'innovations': innovations}

## 8. Loss & Metrics

In [None]:
class MultiHorizonLoss(nn.Module):
    def __init__(self, horizon_weights=None):
        super().__init__()
        self.horizon_weights = horizon_weights
    
    def forward(self, predictions, targets):
        cosine_losses = SphericalUtilsTorch.cosine_loss(predictions, targets)
        if self.horizon_weights:
            weights = torch.tensor(self.horizon_weights, device=predictions.device)
            loss = (cosine_losses * weights.unsqueeze(0)).mean()
        else:
            loss = cosine_losses.mean()
        return loss, {f'loss_h{i}': cosine_losses[:, i].mean().item() for i in range(cosine_losses.shape[1])}

def evaluate_model(model, dataloader, config, device):
    model.eval()
    all_errors = {h: [] for h in config.EVAL_HORIZONS}
    with torch.no_grad():
        for batch in dataloader:
            preds, _ = model(batch['input_positions'].to(device), batch['input_velocities'].to(device))
            targets = batch['targets'].to(device)
            for i, h in enumerate(config.EVAL_HORIZONS):
                errors = SphericalUtilsTorch.angular_error_degrees(preds[:, i], targets[:, i])
                all_errors[h].extend(errors.cpu().numpy().tolist())
    return {f'MAE_{h}s': np.mean(all_errors[h]) for h in config.EVAL_HORIZONS}

## 9. Training Loop

In [None]:
def train_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss, num_batches = 0.0, 0
    for batch in dataloader:
        optimizer.zero_grad()
        preds, _ = model(batch['input_positions'].to(device), batch['input_velocities'].to(device))
        loss, _ = criterion(preds, batch['targets'].to(device))
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
        num_batches += 1
    return {'train_loss': total_loss / num_batches}

def validate(model, dataloader, criterion, device):
    model.eval()
    total_loss, num_batches = 0.0, 0
    with torch.no_grad():
        for batch in dataloader:
            preds, _ = model(batch['input_positions'].to(device), batch['input_velocities'].to(device))
            loss, _ = criterion(preds, batch['targets'].to(device))
            total_loss += loss.item()
            num_batches += 1
    return {'val_loss': total_loss / num_batches}

In [None]:
def train_model(model, train_loader, val_loader, config, device):
    model = model.to(device)
    criterion = MultiHorizonLoss([0.5, 0.7, 0.85, 1.0, 1.0])
    optimizer = optim.Adam(model.parameters(), lr=config.LEARNING_RATE)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=5)
    
    history = {'train_loss': [], 'val_loss': []}
    best_val_loss, patience_counter, best_state = float('inf'), 0, None
    
    for epoch in range(config.NUM_EPOCHS):
        train_metrics = train_epoch(model, train_loader, optimizer, criterion, device)
        val_metrics = validate(model, val_loader, criterion, device)
        scheduler.step(val_metrics['val_loss'])
        
        history['train_loss'].append(train_metrics['train_loss'])
        history['val_loss'].append(val_metrics['val_loss'])
        
        if val_metrics['val_loss'] < best_val_loss:
            best_val_loss = val_metrics['val_loss']
            patience_counter = 0
            best_state = model.state_dict().copy()
        else:
            patience_counter += 1
        
        if epoch % 5 == 0:
            print(f"Epoch {epoch+1}: train={train_metrics['train_loss']:.4f}, val={val_metrics['val_loss']:.4f}")
        
        if patience_counter >= config.EARLY_STOPPING_PATIENCE:
            print(f"Early stopping at epoch {epoch+1}")
            break
    
    if best_state:
        model.load_state_dict(best_state)
    return model, history

## 10. Visualization

In [None]:
def plot_training_history(history):
    fig, ax = plt.subplots(1, 1, figsize=(8, 4))
    ax.plot(history['train_loss'], label='Train')
    ax.plot(history['val_loss'], label='Val')
    ax.set_xlabel('Epoch')
    ax.set_ylabel('Loss')
    ax.legend()
    ax.grid(True)
    plt.show()

def plot_error_vs_horizon(metrics, config):
    horizons = config.EVAL_HORIZONS
    errors = [metrics[f'MAE_{h}s'] for h in horizons]
    plt.figure(figsize=(8, 5))
    plt.plot(horizons, errors, 'o-', lw=2, ms=8)
    plt.xlabel('Horizon (s)')
    plt.ylabel('MAE (degrees)')
    plt.title('Angular Error vs Prediction Horizon')
    plt.grid(True)
    for h, e in zip(horizons, errors):
        plt.annotate(f'{e:.1f}°', (h, e), textcoords='offset points', xytext=(0, 10), ha='center')
    plt.show()

## 11. Main Execution

In [None]:
# Load data (single video for development)
print("Loading data...")
df = load_head_tracking_data(config.DEV_VIDEO_ID, config.HEAD_DATA_DIR)
print(f"Loaded {len(df)} samples from video {config.DEV_VIDEO_ID}")

# Split by participant
participant_ids = sorted(df['id'].unique().tolist())
train_ids, val_ids, test_ids = split_by_participant(participant_ids)
print(f"Participants - Train: {len(train_ids)}, Val: {len(val_ids)}, Test: {len(test_ids)}")

In [None]:
# Create datasets
eval_horizons_steps = [int(h * config.SAMPLE_RATE_HZ) for h in config.EVAL_HORIZONS]

train_dataset = ViewportDataset(df, train_ids, config.INPUT_STEPS, config.PREDICTION_STEPS, eval_horizons_steps)
val_dataset = ViewportDataset(df, val_ids, config.INPUT_STEPS, config.PREDICTION_STEPS, eval_horizons_steps)
test_dataset = ViewportDataset(df, test_ids, config.INPUT_STEPS, config.PREDICTION_STEPS, eval_horizons_steps)

train_loader = DataLoader(train_dataset, batch_size=config.BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=config.BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=config.BATCH_SIZE)

In [None]:
# Initialize and train model
model = KalmanLSTMHybrid(config)
print(f"Model parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

model, history = train_model(model, train_loader, val_loader, config, device)

In [None]:
# Evaluate
plot_training_history(history)

test_metrics = evaluate_model(model, test_loader, config, device)
print("\nTest Results:")
for h in config.EVAL_HORIZONS:
    print(f"  MAE @ {h}s: {test_metrics[f'MAE_{h}s']:.2f}°")

plot_error_vs_horizon(test_metrics, config)