In [1]:
#import models.ekf as ekf_support
from models.pibnn import PIBNN
from models.bnn import BNN
import helper.training_preprocess as tp
import helper.eval_worker as ew

import torch
import numpy as np
import matplotlib.pyplot as plt

## SENSOR ERROR EXPERIMENTS

In [2]:
columns_of_interest_dict = tp.export_preproc_var_columns()
x_columns_of_interest = columns_of_interest_dict['x_columns_of_interest']
y_columns_of_interest = columns_of_interest_dict['y_columns_of_interest']
z_obs_columns = columns_of_interest_dict['z_obs_columns']

In [3]:
def visualize_final_estimates_traj(in_traj_no, truth_traj_data, ekf_traj_data, bnn_traj_data, pinn_traj_data, threeD=False,
                                     smoothing=False):
    """
        TRUTH: RED
        EKF: YELLOW
        BNN: BLUE
        PINN: MAGENTA
    """
    def moving_average(data, window_size):
        return np.convolve(data, np.ones(window_size) / window_size, mode='valid')

    truth_data = truth_traj_data[in_traj_no]
    ekf_data = ekf_traj_data[in_traj_no]['state_est']
    ekf_uncer = ekf_traj_data[in_traj_no]['state_uncer']
    bnn_data = bnn_traj_data[in_traj_no]['state_est']
    bnn_uncer = bnn_traj_data[in_traj_no]['state_uncer']
    pinn_data = pinn_traj_data[in_traj_no]['state_est']
    pinn_uncer = pinn_traj_data[in_traj_no]['state_uncer']

    x_coords_truth = [point[0] for point in truth_data]
    y_coords_truth = [point[1] for point in truth_data]
    z_coords_truth = [point[2] for point in truth_data]

    x_coords_ekf = [point[0] for point in ekf_data]
    y_coords_ekf = [point[1] for point in ekf_data]
    z_coords_ekf = [point[2] for point in ekf_data]

    x_coords_bnn = [point[0] for point in bnn_data]
    y_coords_bnn = [point[1] for point in bnn_data]
    z_coords_bnn = [point[2] for point in bnn_data]

    x_coords_pinn = [point[0] for point in pinn_data]
    y_coords_pinn = [point[1] for point in pinn_data]
    z_coords_pinn = [point[2] for point in pinn_data]

    ## Error + Uncertainty Plot ##
    ekf_euclid_errs = np.array([np.linalg.norm(a - b) for a, b in zip(truth_data, ekf_data)])
    ekf_uncer_mean = np.array([np.sqrt(np.mean(vector)) for vector in ekf_uncer])
    ekf_lower_bound = ekf_euclid_errs - ekf_uncer_mean
    ekf_upper_bound = ekf_euclid_errs + ekf_uncer_mean

    bnn_euclid_errs = np.array([np.linalg.norm(a - b) for a, b in zip(truth_data, bnn_data)])
    bnn_uncer_mean = np.array([np.mean(vector.numpy()) for vector in bnn_uncer])
    bnn_lower_bound = bnn_euclid_errs - bnn_uncer_mean
    bnn_upper_bound = bnn_euclid_errs + bnn_uncer_mean

    pinn_euclid_errs = np.array([np.linalg.norm(a - b) for a, b in zip(truth_data, pinn_data)])
    pinn_uncer_mean = np.array([np.mean(vector) for vector in pinn_uncer])
    pinn_lower_bound = pinn_euclid_errs - pinn_uncer_mean
    pinn_upper_bound = pinn_euclid_errs + pinn_uncer_mean

    plt.figure(figsize=(10, 6))

    plt.plot(ekf_euclid_errs, label='EKF', color='y', linewidth=3)
    plt.fill_between(range(len(ekf_euclid_errs)), ekf_lower_bound, ekf_upper_bound, color='y', alpha=0.3,
                     label='EKF Uncertainty')

    plt.plot(moving_average(bnn_euclid_errs, 5), label='BNN', color='b', linewidth=3)
    plt.fill_between(range(len(bnn_euclid_errs)), bnn_lower_bound, bnn_upper_bound, color='b', alpha=0.3,
                     label='BNN Uncertainty')

    plt.plot(moving_average(pinn_euclid_errs, 5), label='PINN', color='m', linewidth=3)
    plt.fill_between(range(len(pinn_euclid_errs)), pinn_lower_bound, pinn_upper_bound, color='m', alpha=0.3,
                     label='PINN Uncertainty')

    plt.title('Model State Estimate Error and Uncertainty wrt Truth', fontsize=16)
    plt.xlabel('Time Step', fontsize=14)
    plt.ylabel('Error [m]', fontsize=14)
    plt.legend(loc='upper left')
    plt.show()
    ## Error + Uncertainty Plot ##

    ## Trajectory Generation Compare ##
    if smoothing:
        window_size = int(0.05 * len(x_coords_bnn))
        x_coords_bnn = moving_average(x_coords_bnn, window_size)
        y_coords_bnn = moving_average(y_coords_bnn, window_size)
        z_coords_bnn = moving_average(z_coords_bnn, window_size)

        x_coords_pinn = moving_average(x_coords_pinn, window_size)
        y_coords_pinn = moving_average(y_coords_pinn, window_size)
        z_coords_pinn = moving_average(z_coords_pinn, window_size)

    if threeD:
        fig = plt.figure()

        ax = fig.add_subplot(111, projection='3d')

        ax.scatter(x_coords_truth, y_coords_truth, z_coords_truth, c='r', marker='x', label="Truth")
        ax.scatter(x_coords_ekf, y_coords_ekf, z_coords_ekf, c='y', marker='^', label="EKF")
        ax.scatter(x_coords_bnn, y_coords_bnn, z_coords_bnn, c='b', marker='o', label="BNN")
        ax.scatter(x_coords_pinn, y_coords_pinn, z_coords_pinn, c='m', marker='*', label="PINN")

        ax.set_xlabel('X [m]')
        ax.set_ylabel('Y [m]')
        ax.set_zlabel('Z [m]')

        ax.set_title(f'Model Approximations of Truth Trajectory {in_traj_no}')
        ax.legend()
        plt.show()
    else:
        fig, axs = plt.subplots(3, 1, figsize=(10, 15))

        # X
        axs[0].plot(x_coords_truth, label='Truth', color='r', marker='x', linestyle='-', markersize=5)
        axs[0].plot(x_coords_ekf, label='EKF', color='y', marker='^', linestyle='-', markersize=5)
        axs[0].plot(x_coords_bnn, label='BNN', color='b', marker='o', linestyle='-', markersize=5)
        axs[0].plot(x_coords_pinn, label='PINN', color='m', marker='*', linestyle='-', markersize=5)
        axs[0].set_title(f'X Approximations of Sample Trajectory #{in_traj_no}')
        axs[0].set_xlabel('Time Step')
        axs[0].set_ylabel('X [m]')
        axs[0].legend()
        axs[0].grid(True)

        # Y
        axs[1].plot(y_coords_truth, label='Truth', color='r', marker='x', linestyle='-', markersize=5)
        axs[1].plot(y_coords_ekf, label='EKF', color='y', marker='^', linestyle='-', markersize=5)
        axs[1].plot(y_coords_bnn, label='BNN', color='b', marker='o', linestyle='-', markersize=5)
        axs[1].plot(y_coords_pinn, label='PINN', color='m', marker='*', linestyle='-', markersize=5)
        axs[1].set_title(f'Y Approximations of Sample Trajectory #{in_traj_no}')
        axs[1].set_xlabel('Time Step')
        axs[1].set_ylabel('Y [m]')
        axs[1].legend()
        axs[1].grid(True)

        # Z
        axs[2].plot(z_coords_truth, label='Truth', color='r', marker='x', linestyle='-', markersize=5)
        axs[2].plot(z_coords_ekf, label='EKF', color='y', marker='^', linestyle='-', markersize=5)
        axs[2].plot(z_coords_bnn, label='BNN', color='b', marker='o', linestyle='-', markersize=5)
        axs[2].plot(z_coords_pinn, label='PINN', color='m', marker='*', linestyle='-', markersize=5)
        axs[2].set_title(f'Z Approximations of Sample Trajectory #{in_traj_no}')
        axs[2].set_xlabel('Time Step')
        axs[2].set_ylabel('Z [m]')
        axs[2].legend()
        axs[2].grid(True)

        # Adjust layout
        plt.tight_layout()
        plt.show()
    ## Trajectory Generation Compare ##
    return None

In [4]:
def run_model_eval(bnn_filename, pibnn_filename, sensor_uncer_in=1, ce=False):
    print(f"RUNNING MODEL EVAL COMPARISON OF {sensor_uncer_in} M ERROR")
    loss_params = {'ml': 0.7, 'physics': 0.3}
    eval_stats = {}

    train_df, test_df, train_traj_data, test_traj_data = tp.readin_dataframes(train_split=0.9, sensor_pos_uncer=sensor_uncer_in)
    X_test, y_test, df_cleaned_test = tp.preprocess_and_remove_inter_trajectory_indices(test_df)
    z_obs_test = torch.tensor(df_cleaned_test[z_obs_columns][1:].to_numpy(), dtype=torch.float32)
    
    bnn_model = BNN(input_size=len(x_columns_of_interest), output_size=len(y_columns_of_interest), hidden_layer_size=64, prior_sigma_lay=0.01, epochs=10000, show_info=True)
    bnn_model.load_model_weights(f'../../models/saved_weights/bnn/{bnn_filename}')

    pinn_model = PIBNN(input_size=8, output_size=len(y_columns_of_interest), hidden_layer_size=64, prior_sigma_lay=0.01, sensor_measurement_uncertainty=sensor_uncer_in, lambda_params=loss_params, epochs=10000, bias_in=True)
    pinn_model.load_model_weights(f'../../models/saved_weights/pibnn/{pibnn_filename}')
    
    truth_traj, ekf_proposed_traj, avg_ekf_err, avg_ekf_uncertainty, avg_meas_err, avg_meas_uncertainty = ekf_support.run_EKF_traj_data(test_df)
    eval_stats['EKF_ERR'] = avg_ekf_err
    eval_stats['EKF_UNCER'] = avg_ekf_uncertainty
    
    avg_bnn_model_err, avg_bnn_model_uncertainty, bnn_model_proposed_traj = ew.evaluate_bayesian_neural_model(bnn_model, df_cleaned_test, X_test, y_test, z_obs_test, pinn_eval=False, apply_EKF_in=False, continuous_eval=ce)
    eval_stats['BNN_ERR'] = avg_bnn_model_err
    eval_stats['BNN_UNCER'] = avg_bnn_model_uncertainty
    
    avg_pinn_model_err, avg_pinn_model_uncertainty, pinn_model_proposed_traj = ew.evaluate_bayesian_neural_model(pinn_model, df_cleaned_test, X_test, y_test, z_obs_test, pinn_eval=True, apply_EKF_in=True, continuous_eval=ce)
    eval_stats['PIBNN_ERR'] = avg_pinn_model_err
    eval_stats['PIBNN_UNCER'] = avg_pinn_model_uncertainty
    
    return truth_traj, ekf_proposed_traj, bnn_model_proposed_traj, pinn_model_proposed_traj, eval_stats

In [5]:
def process_stats(stats_in):
    print(f"EKF ERR: {np.mean(stats_in['EKF_ERR'])}, EKF UNCER: {np.mean(stats_in['EKF_UNCER'])}")
    print(f"BNN ERR: {np.mean(stats_in['BNN_ERR'])}, BNN UNCER: {np.mean(stats_in['BNN_UNCER'])}")
    print(f"EKPIBNNF ERR: {np.mean(stats_in['PIBNN_ERR'])}, EKPIBNNF UNCER: {np.mean(stats_in['PIBNN_UNCER'])}")

In [6]:
test_traj_no = 20

### Sigma = 1 Meters

In [7]:
bnn_weights = "10k_1MSU_64H_bnn_model_2024-12-15_20-57-12.pth"
pibnn_weights = "10k_1MSU_64H_pibnn_model_2024-12-15_22-41-36.pth"

In [8]:
%%time
one_m_truth_traj, one_m_ekf_proposed_traj, one_m_bnn_model_proposed_traj, one_m_pinn_model_proposed_traj, one_m_eval_stats = run_model_eval(bnn_weights, pibnn_weights, sensor_uncer_in=1)

RUNNING MODEL EVAL COMPARISON OF 1 M ERROR
Number of Train Trajectories: 4584
Number of Test Trajectories: 509
Model weights loaded from ../../models/saved_weights/bnn/10k_1MSU_64H_bnn_model_2024-12-15_20-57-12.pth
Model weights loaded from ../../models/saved_weights/pibnn/10k_1MSU_64H_pibnn_model_2024-12-15_22-41-36.pth


  self.load_state_dict(torch.load(filepath))
  self.load_state_dict(torch.load(filepath))


NameError: name 'ekf_support' is not defined

In [9]:
visualize_final_estimates_traj(test_traj_no, one_m_truth_traj, one_m_ekf_proposed_traj, one_m_bnn_model_proposed_traj, one_m_pinn_model_proposed_traj, threeD=False, smoothing=False)

NameError: name 'one_m_truth_traj' is not defined

In [None]:
process_stats(one_m_eval_stats)

### Sigma = 5 Meters

In [None]:
bnn_weights = "10k_5MSU_64H_bnn_model_2024-12-16_16-23-22.pth"
pibnn_weights = "10k_5MSU_64H_pibnn_model_2024-12-16_00-51-53.pth"

In [None]:
five_m_truth_traj, five_m_ekf_proposed_traj, five_m_bnn_model_proposed_traj, five_m_pinn_model_proposed_traj, five_m_eval_stats = run_model_eval(bnn_weights, pibnn_weights, sensor_uncer_in=5, ce=True)

In [None]:
visualize_final_estimates_traj(test_traj_no, five_m_truth_traj, five_m_ekf_proposed_traj, five_m_bnn_model_proposed_traj, five_m_pinn_model_proposed_traj, threeD=False, smoothing=True)

In [None]:
process_stats(five_m_eval_stats)

### Sigma = 10 Meters

In [None]:
bnn_weights = "10k_10MSU_64H_bnn_model_2024-12-16_13-06-59.pth"
pibnn_weights = "10k_10MSU_64H_pibnn_model_2024-12-15_17-24-44.pth"

In [None]:
ten_m_truth_traj, ten_m_ekf_proposed_traj, ten_m_bnn_model_proposed_traj, ten_m_pinn_model_proposed_traj, ten_m_eval_stats = run_model_eval(bnn_weights, pibnn_weights, sensor_uncer_in=10, ce=True)

In [None]:
visualize_final_estimates_traj(test_traj_no, ten_m_truth_traj, ten_m_ekf_proposed_traj, ten_m_bnn_model_proposed_traj, ten_m_pinn_model_proposed_traj, threeD=False, smoothing=True)

In [None]:
process_stats(ten_m_eval_stats)

### Bar Graph

In [None]:
def produce_bar_graph_comp(uncer=False):
    if uncer:
        labels = ['EKF_UNCER', 'BNN_UNCER', 'PIBNN_UNCER']
    else:
        labels = ['EKF_ERR', 'BNN_ERR', 'PIBNN_ERR']
        
    one_m_vals = [np.mean(one_m_eval_stats[key]) for key in labels]
    five_m_vals = [np.mean(five_m_eval_stats[key]) for key in labels]
    ten_m_vals = [np.mean(ten_m_eval_stats[key]) for key in labels]
    x = np.arange(len(labels))
    width = 0.25
    
    fig, ax = plt.subplots(figsize=(8, 6))
    bars1 = ax.bar(x - width, one_m_vals, width, label="data_noise_sigma=1m", color='red')
    bars2 = ax.bar(x, five_m_vals, width, label="data_noise_sigma=5m", color='green')
    bars3 = ax.bar(x + width, ten_m_vals, width, label="data_noise_sigma=10 m", color='blue')
    
    if uncer:
        ax.set_xlabel('Models', fontsize=14)
        ax.set_ylabel('Variance [m^2]', fontsize=14)
        ax.set_title('Average State Estimate Uncertainty Across Different Models', fontsize=16)
    else:
        ax.set_xlabel('Models', fontsize=14)
        ax.set_ylabel('Error [m]', fontsize=14)
        ax.set_title('Average State Estimate Error Across Different Models', fontsize=16)
        
    ax.set_xticks(x)
    labels_edit = ['EKF', 'BNN', 'EKPIBNNF']
    ax.set_xticklabels(labels_edit)
    ax.legend()
    plt.show()
    return None

In [None]:
produce_bar_graph_comp(uncer=False)

In [None]:
produce_bar_graph_comp(uncer=True)