# 🧠 ML-Based Localization Training NotebookThis notebook trains a neural network to replicate the Extended Kalman Filter (EKF) localization algorithm using real sensor data from the Polaris autonomous vehicle.## 📋 Overview**Goal**: Train an LSTM neural network to predict vehicle state (position, velocity, attitude) from sensor inputs (IMU, GPS, odometry)**Approach**:1. Load processed sensor data2. Prepare training/test datasets3. Train LSTM model4. Evaluate performance5. Compare with EKF baseline**Expected Results**:- Position RMSE: < 1.0 m- R² Score: > 0.90- Training time: 20-30 minutes (CPU)

## 1️⃣ Setup and Imports

In [None]:
# Standard librariesimport numpy as npimport pandas as pdimport matplotlib.pyplot as pltimport seaborn as snsfrom pathlib import Pathimport time# Deep learningimport torchimport torch.nn as nnimport torch.optim as optimfrom torch.utils.data import Dataset, DataLoader# Machine learning utilitiesfrom sklearn.preprocessing import StandardScalerfrom sklearn.model_selection import train_test_splitfrom sklearn.metrics import mean_squared_error, r2_score, mean_absolute_errorimport joblib# Plotting settingsplt.style.use('default')sns.set_palette("husl")%matplotlib inlineprint("✓ All imports successful!")print(f"PyTorch version: {torch.__version__}")print(f"CUDA available: {torch.cuda.is_available()}")if torch.cuda.is_available():    print(f"GPU: {torch.cuda.get_device_name(0)}")

## 2️⃣ Configuration

In [None]:
# File pathsDATA_FILE = 'data/processed/localization_training_data.csv'MODEL_DIR = Path('models')RESULTS_DIR = Path('results/ml_training')# Create directoriesMODEL_DIR.mkdir(exist_ok=True, parents=True)RESULTS_DIR.mkdir(exist_ok=True, parents=True)# Model hyperparameters - EXPERIMENT WITH THESE!HYPERPARAMS = {    'sequence_length': 10,      # Try: 5, 10, 20    'hidden_size': 128,          # Try: 64, 128, 256    'num_layers': 2,             # Try: 1, 2, 3    'batch_size': 32,            # Try: 16, 32, 64    'learning_rate': 0.001,      # Try: 0.0001, 0.001, 0.01    'epochs': 50,                # Try: 30, 50, 100    'test_size': 0.2,            # Try: 0.1, 0.2, 0.3    'dropout': 0.2,              # Try: 0.1, 0.2, 0.3}# Device configurationdevice = torch.device('cuda' if torch.cuda.is_available() else 'cpu')print("Configuration:")for key, value in HYPERPARAMS.items():    print(f"  {key:20} = {value}")print(f"\nDevice: {device}")

## 3️⃣ Load and Explore Data

In [None]:
# Load dataprint(f"Loading data from: {DATA_FILE}")df = pd.read_csv(DATA_FILE)print(f"\nDataset shape: {df.shape}")print(f"Duration: {df['time_sec'].max() - df['time_sec'].min():.1f} seconds")print(f"Frequency: ~{1/df['time_sec'].diff().mean():.1f} Hz")# Display first few rowsdf.head()

In [None]:
# Check for missing valuesprint("Missing values per column:")missing = df.isnull().sum()missing_pct = (missing / len(df)) * 100missing_df = pd.DataFrame({    'Missing': missing,    'Percentage': missing_pct})missing_df[missing_df['Missing'] > 0].sort_values('Missing', ascending=False)

In [None]:
# Visualize sensor datafig, axes = plt.subplots(2, 2, figsize=(15, 10))# IMU angular velocityaxes[0, 0].plot(df['time_sec'], df['ang_vel_x'], label='X', alpha=0.7)axes[0, 0].plot(df['time_sec'], df['ang_vel_y'], label='Y', alpha=0.7)axes[0, 0].plot(df['time_sec'], df['ang_vel_z'], label='Z', alpha=0.7)axes[0, 0].set_xlabel('Time (s)')axes[0, 0].set_ylabel('Angular Velocity (rad/s)')axes[0, 0].set_title('IMU Angular Velocity')axes[0, 0].legend()axes[0, 0].grid(True)# GPS trajectoryaxes[0, 1].plot(df['enu_x'], df['enu_y'])axes[0, 1].set_xlabel('East (m)')axes[0, 1].set_ylabel('North (m)')axes[0, 1].set_title('Vehicle Trajectory')axes[0, 1].grid(True)axes[0, 1].axis('equal')# Speedaxes[1, 0].plot(df['time_sec'], df['speed'])axes[1, 0].set_xlabel('Time (s)')axes[1, 0].set_ylabel('Speed (m/s)')axes[1, 0].set_title('Vehicle Speed')axes[1, 0].grid(True)# Attitudeaxes[1, 1].plot(df['time_sec'], np.degrees(df['roll']), label='Roll', alpha=0.7)axes[1, 1].plot(df['time_sec'], np.degrees(df['pitch']), label='Pitch', alpha=0.7)axes[1, 1].plot(df['time_sec'], np.degrees(df['yaw']), label='Yaw', alpha=0.7)axes[1, 1].set_xlabel('Time (s)')axes[1, 1].set_ylabel('Attitude (degrees)')axes[1, 1].set_title('Vehicle Attitude')axes[1, 1].legend()axes[1, 1].grid(True)plt.tight_layout()plt.savefig(RESULTS_DIR / 'data_exploration.png', dpi=300, bbox_inches='tight')plt.show()

## 4️⃣ Define Neural Network Architecture

In [None]:
class LocalizationDataset(Dataset):    """PyTorch Dataset for localization training data."""        def __init__(self, sensor_data, target_data, sequence_length=10):        self.sensor_data = torch.FloatTensor(sensor_data)        self.target_data = torch.FloatTensor(target_data)        self.sequence_length = sequence_length            def __len__(self):        return len(self.sensor_data) - self.sequence_length + 1        def __getitem__(self, idx):        sensor_seq = self.sensor_data[idx:idx + self.sequence_length]        target = self.target_data[idx + self.sequence_length - 1]        return sensor_seq, targetclass LocalizationLSTM(nn.Module):    """LSTM-based neural network for vehicle localization."""        def __init__(self, input_size, hidden_size=128, num_layers=2, output_size=12, dropout=0.2):        super(LocalizationLSTM, self).__init__()                self.hidden_size = hidden_size        self.num_layers = num_layers                # LSTM layers        self.lstm = nn.LSTM(            input_size=input_size,            hidden_size=hidden_size,            num_layers=num_layers,            batch_first=True,            dropout=dropout if num_layers > 1 else 0        )                # Dense layers        self.fc_layers = nn.Sequential(            nn.Linear(hidden_size, 256),            nn.ReLU(),            nn.Dropout(0.3),            nn.Linear(256, 128),            nn.ReLU(),            nn.Dropout(dropout),            nn.Linear(128, 64),            nn.ReLU(),            nn.Linear(64, output_size)        )            def forward(self, x):        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)                lstm_out, _ = self.lstm(x, (h0, c0))        last_output = lstm_out[:, -1, :]        output = self.fc_layers(last_output)                return output# Create modelmodel = LocalizationLSTM(    input_size=10,    hidden_size=HYPERPARAMS['hidden_size'],    num_layers=HYPERPARAMS['num_layers'],    output_size=12,    dropout=HYPERPARAMS['dropout'])print("Model Architecture:")print(model)print(f"\nTotal parameters: {sum(p.numel() for p in model.parameters()):,}")

## 5️⃣ Prepare Training Data

In [None]:
# Define featuressensor_cols = [    'ang_vel_x', 'ang_vel_y', 'ang_vel_z',    'lin_acc_x', 'lin_acc_y', 'lin_acc_z',    'latitude', 'longitude', 'altitude', 'speed']target_cols = [    'enu_x', 'enu_y', 'enu_z',    'vel_x', 'vel_y', 'vel_z',    'roll', 'pitch', 'yaw',    'omega_roll', 'omega_pitch', 'omega_yaw']# Extract and prepare dataavailable_sensor_cols = [col for col in sensor_cols if col in df.columns]available_target_cols = [col for col in target_cols if col in df.columns]sensor_data = df[available_sensor_cols].fillna(0).valuestarget_data = df[available_target_cols].fillna(0).valuesprint(f"Input features: {len(available_sensor_cols)}")print(f"Target features: {len(available_target_cols)}")print(f"Data points: {len(sensor_data):,}")

In [None]:
# Normalize datascaler_input = StandardScaler()scaler_output = StandardScaler()sensor_data_scaled = scaler_input.fit_transform(sensor_data)target_data_scaled = scaler_output.fit_transform(target_data)# Split dataX_train, X_test, y_train, y_test = train_test_split(    sensor_data_scaled, target_data_scaled,    test_size=HYPERPARAMS['test_size'], random_state=42)# Create datasets and loaderstrain_dataset = LocalizationDataset(X_train, y_train, HYPERPARAMS['sequence_length'])test_dataset = LocalizationDataset(X_test, y_test, HYPERPARAMS['sequence_length'])train_loader = DataLoader(train_dataset, batch_size=HYPERPARAMS['batch_size'], shuffle=True)test_loader = DataLoader(test_dataset, batch_size=HYPERPARAMS['batch_size'], shuffle=False)print(f"Training sequences: {len(train_dataset):,}")print(f"Test sequences: {len(test_dataset):,}")print(f"Training batches: {len(train_loader)}")print(f"Test batches: {len(test_loader)}")

## 6️⃣ Train the Model

In [None]:
# Setup trainingmodel = model.to(device)criterion = nn.MSELoss()optimizer = optim.Adam(model.parameters(), lr=HYPERPARAMS['learning_rate'])scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10, factor=0.5, verbose=True)print("Ready to train!")print(f"Device: {device}")print(f"Epochs: {HYPERPARAMS['epochs']}")

In [None]:
# Training looptrain_losses = []test_losses = []best_test_loss = float('inf')print("\nStarting training...\n")start_time = time.time()for epoch in range(HYPERPARAMS['epochs']):    # Training    model.train()    train_loss = 0.0        for batch_sensor, batch_target in train_loader:        batch_sensor = batch_sensor.to(device)        batch_target = batch_target.to(device)                optimizer.zero_grad()        outputs = model(batch_sensor)        loss = criterion(outputs, batch_target)        loss.backward()        optimizer.step()                train_loss += loss.item()        # Validation    model.eval()    test_loss = 0.0        with torch.no_grad():        for batch_sensor, batch_target in test_loader:            batch_sensor = batch_sensor.to(device)            batch_target = batch_target.to(device)            outputs = model(batch_sensor)            loss = criterion(outputs, batch_target)            test_loss += loss.item()        # Record losses    avg_train_loss = train_loss / len(train_loader)    avg_test_loss = test_loss / len(test_loader)    train_losses.append(avg_train_loss)    test_losses.append(avg_test_loss)        scheduler.step(avg_test_loss)        # Save best model    if avg_test_loss < best_test_loss:        best_test_loss = avg_test_loss        torch.save(model.state_dict(), MODEL_DIR / 'best_model.pth')        # Print progress    if epoch % 5 == 0 or epoch == HYPERPARAMS['epochs'] - 1:        elapsed = time.time() - start_time        print(f"Epoch {epoch:3d}/{HYPERPARAMS['epochs']} | "              f"Train: {avg_train_loss:.6f} | "              f"Test: {avg_test_loss:.6f} | "              f"Time: {elapsed:.1f}s")total_time = time.time() - start_timeprint(f"\n✓ Training complete! Time: {total_time/60:.1f} min")print(f"✓ Best test loss: {best_test_loss:.6f}")

In [None]:
# Plot training historyplt.figure(figsize=(12, 4))plt.subplot(1, 2, 1)plt.plot(train_losses, label='Training Loss', linewidth=2)plt.plot(test_losses, label='Validation Loss', linewidth=2)plt.xlabel('Epoch')plt.ylabel('Loss (MSE)')plt.title('Training History')plt.legend()plt.grid(True, alpha=0.3)plt.yscale('log')plt.subplot(1, 2, 2)if len(train_losses) > 20:    plt.plot(range(len(train_losses)-20, len(train_losses)), train_losses[-20:], label='Train', linewidth=2)    plt.plot(range(len(test_losses)-20, len(test_losses)), test_losses[-20:], label='Val', linewidth=2)else:    plt.plot(train_losses, label='Train', linewidth=2)    plt.plot(test_losses, label='Val', linewidth=2)plt.xlabel('Epoch')plt.ylabel('Loss (MSE)')plt.title('Last 20 Epochs')plt.legend()plt.grid(True, alpha=0.3)plt.tight_layout()plt.savefig(RESULTS_DIR / 'training_history.png', dpi=300)plt.show()

## 7️⃣ Evaluate Model Performance

In [None]:
# Load best model and make predictionsmodel.load_state_dict(torch.load(MODEL_DIR / 'best_model.pth'))model.eval()all_predictions = []all_targets = []with torch.no_grad():    for batch_sensor, batch_target in test_loader:        batch_sensor = batch_sensor.to(device)        predictions = model(batch_sensor)        all_predictions.append(predictions.cpu().numpy())        all_targets.append(batch_target.cpu().numpy())predictions = np.concatenate(all_predictions, axis=0)targets = np.concatenate(all_targets, axis=0)# Inverse transformpredictions_original = scaler_output.inverse_transform(predictions)targets_original = scaler_output.inverse_transform(targets)print(f"Predictions shape: {predictions_original.shape}")

In [None]:
# Calculate metricsmse = mean_squared_error(targets_original, predictions_original)rmse = np.sqrt(mse)mae = mean_absolute_error(targets_original, predictions_original)r2 = r2_score(targets_original, predictions_original)position_rmse = np.sqrt(mean_squared_error(targets_original[:, :3], predictions_original[:, :3]))print("="*60)print("EVALUATION RESULTS")print("="*60)print(f"\nOverall Metrics:")print(f"  RMSE:  {rmse:.4f}")print(f"  MAE:   {mae:.4f}")print(f"  R²:    {r2:.4f}")print(f"\nPosition RMSE: {position_rmse:.4f} m")print(f"EKF Baseline:  0.62 m")print(f"Target:        < 1.0 m")if position_rmse < 1.0:    print("\n✓ TARGET ACHIEVED!")else:    print(f"\n⚠ Need improvement (target < 1.0m)")

In [None]:
# Scatter plotsfig, axes = plt.subplots(3, 4, figsize=(20, 12))axes = axes.flatten()output_names = ['X', 'Y', 'Z', 'Vx', 'Vy', 'Vz', 'Roll', 'Pitch', 'Yaw', 'ωx', 'ωy', 'ωz']for i, name in enumerate(output_names[:len(available_target_cols)]):    axes[i].scatter(targets_original[:, i], predictions_original[:, i], alpha=0.5, s=10)        min_val = min(targets_original[:, i].min(), predictions_original[:, i].min())    max_val = max(targets_original[:, i].max(), predictions_original[:, i].max())    axes[i].plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=2)        axes[i].set_xlabel(f'True {name}')    axes[i].set_ylabel(f'Predicted {name}')    axes[i].set_title(f'{name}')    axes[i].grid(True, alpha=0.3)        r2_i = r2_score(targets_original[:, i], predictions_original[:, i])    axes[i].text(0.05, 0.95, f'R² = {r2_i:.3f}', transform=axes[i].transAxes,                verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))plt.tight_layout()plt.savefig(RESULTS_DIR / 'predictions.png', dpi=300)plt.show()

## 8️⃣ Save Model and Results

In [None]:
# Save model and scalerstorch.save(model.state_dict(), MODEL_DIR / 'ml_localization_model.pth')joblib.dump({'input_scaler': scaler_input, 'output_scaler': scaler_output}, MODEL_DIR / 'ml_scalers.pkl')print(f"✓ Model saved to: {MODEL_DIR / 'ml_localization_model.pth'}")print(f"✓ Scalers saved to: {MODEL_DIR / 'ml_scalers.pkl'}")

## 9️⃣ Summary and Next Steps

In [None]:
print("="*60)print("TRAINING COMPLETE!")print("="*60)print(f"\nFinal Results:")print(f"  Position RMSE:  {position_rmse:.4f} m")print(f"  R² Score:       {r2:.4f}")print(f"  Training time:  {total_time/60:.1f} min")print(f"\n🚀 Next Steps:")print("  1. Run: python scripts/compare_ekf_ml.py")print("  2. Experiment with hyperparameters above")print("  3. Try different model architectures")print("  4. Analyze per-output performance")