In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import norm

# Read the CSV file into a DataFrame
# data = pd.read_csv('eval_data_2024-04-12-14-35-21_s50.csv')  # Brattorkaia, works best!
# data = pd.read_csv('eval_data_2024-04-25-13-43-12.bag -s 60.csv')  # Bridge run 1, successful! ish 8m error, messy path
# data = pd.read_csv('eval_data_2024-04-05-13-16-20.bag_low_tide.csv')  # +3s brattora, low tide, 5m mean error
# data = pd.read_csv('eval_data_2024-04-12-14-35-21.bag -s 60 mid_tide.csv')  # +3s brattora, mid tide, 5m mean error
# data = pd.read_csv('eval_data_2024-04-12-15-04-39.bag -s 30 high tide.csv')  # +3s brattora, high tide, 5-6m mean error
# data = pd.read_csv('eval_data_2024-04-12-14-26-20.bag -s 30 mid tide.csv')  # +3s brattora, mid tide, 4-5m mean error
data = pd.read_csv('eval_data.csv')  # latest log

# adjust timeframe to start at 0
data['timestamp'] = data['timestamp'] - data['timestamp'].min()

# set time limits
start_time = 0
end_time = data['timestamp'].max() # - 20

# filter rows with nans in transformation_history_x/y and gps/local_fix_x/y
data = data.dropna(subset=['transformation_history_x', 'transformation_history_y', 'gps/local_fix_x', 'gps/local_fix_y'])

# print nans
print(data.isnull().sum())

# filter data
data = data[(data['timestamp'] >= start_time) & (data['timestamp'] <= end_time)]


**Position and error plots**

In [None]:
# Plot the local fix and path points
fig, ax = plt.subplots(3, 1, figsize=(12, 12))

# First subplot for the paths in xy plane
ax[0].scatter(data['gps/local_fix_x'].to_numpy(), data['gps/local_fix_y'].to_numpy(), label='GPS Path', color='lightblue', s = 5)
ax[0].scatter(data['path_points_x'].to_numpy(), data['path_points_y'].to_numpy(), label='PCMF path points', color='orange', s = 5)
ax[0].set_xlabel('X Position')
ax[0].set_ylabel('Y Position')
ax[0].set_title('Paths in XY plane')
ax[0].legend()
ax[0].grid(True)
# equal aspect ratio and scaling
ax[0].set_aspect('equal', 'box')

# Draw error lines between the points
# for i in range(len(data)):
#     ax[0].plot([data['gps/local_fix_x'].iloc[i], data['path_points_x'].iloc[i]], 
#                [data['gps/local_fix_y'].iloc[i], data['path_points_y'].iloc[i]], 
#                'lightgrey', linestyle='dotted')

# Second subplot for error over time
errors = np.sqrt((data['gps/local_fix_x'] - data['path_points_x'])**2 + (data['gps/local_fix_y'] - data['path_points_y'])**2)
ax[1].plot(data['timestamp'].to_numpy(), errors.to_numpy(), label='Error Over Time', color='red')
ax[1].set_xlabel('Timestamp')
ax[1].set_ylabel('Error')
ax[1].set_title('Error Over Time')
ax[1].grid(True)
ax[1].legend()

# Third subplot for boxplot of errors
ax[2].boxplot(errors, vert=False, patch_artist=True)
ax[2].set_title('Boxplot of Errors')
ax[2].set_xlabel('Error')
ax[2].grid(True)

# Adjust layout to prevent overlap
plt.tight_layout(h_pad=5.0, pad=3.0, w_pad=3.0)

# Show the plot
plt.show()

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
import numpy as np
from matplotlib.patches import Ellipse
from filterpy.kalman import UnscentedKalmanFilter, MerweScaledSigmaPoints

def plot_covariance_ellipse(ax, mean, cov, n_std=2, **kwargs):
    """ Plots an ellipse representing the covariance matrix """
    eigvals, eigvecs = np.linalg.eigh(cov[:2, :2])  # extract covariance of x and y
    order = eigvals.argsort()[::-1]
    eigvals, eigvecs = eigvals[order], eigvecs[:, order]
    theta = np.degrees(np.arctan2(*eigvecs[:, 0][::-1]))
    w, h = 2 * n_std * np.sqrt(eigvals)
    ellipse = Ellipse(xy=(mean[0], mean[2]), width=(w+h)/2, height=(w+h)/2, angle=theta, **kwargs)
    ellipse.set_label('Covariance')
    ax.add_patch(ellipse)

def fx(x, dt):
    """ State transition function for a constant velocity model """
    F = np.array([[1, dt, 0,  0], 
                  [0,  1, 0,  0], 
                  [0,  0, 1, dt],
                  [0,  0, 0,  1]])
    return np.dot(F, x)

def hx(x):
    """ Observation function - here we're only observing positions """
    return np.array([x[0], x[2]])

def UKF(data):

    # UKF Initialization
    dt = 1  # time step
    points = MerweScaledSigmaPoints(4, alpha=0.1, beta=2., kappa=-1)
    ukf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, fx=fx, hx=hx, points=points)
    process_noise = 0.01
    ukf.Q = np.diag([process_noise, process_noise, process_noise, process_noise])  # process noise
    ukf.R = np.diag([100, 100])                # initial measurement noise
    ukf.x = np.array([data['path_points_x'].iloc[0], 0., data['path_points_y'].iloc[0], 0.])  # initial state
    ukf.P = np.eye(4) * 100                # initial uncertainty

    # Extract path points
    path_x = data['path_points_x'].values
    path_y = data['path_points_y'].values

    # Apply UKF and store results, innovation
    ukf_results = []
    covariances = []

    for px, py in zip(path_x, path_y):
        ukf.predict()
        z = np.array([px, py])
        z_pred = ukf.hx(ukf.x)
        innovation = z - z_pred
        if np.linalg.norm(innovation) > 30:  # Threshold for detecting large jumps
            ukf.R = np.diag([100000, 100000])  # Increase measurement noise
        else:
            ukf.R = np.diag([100, 100])  # Normal measurement noise
        
        ukf.update(z)
        ukf_results.append(ukf.x.copy())
        covariances.append(ukf.P.copy())



    ukf_results = np.array(ukf_results)
    covariances = np.array(covariances)

    return ukf_results, covariances

# Load map data
map_file_path = r"C:\Users\eirik\OneDrive - NTNU\Semester 10\code\lidar_ws\src\pcmf\maps\all_points.npy" # bridge map data
# map_file_path = "/home/eirik/lidar_ws/src/pcmf/maps/bay_lines_v3.npy" # brattorkaia map data
map_data = np.load(map_file_path)

# shift map data 200m left
map_data[:, 0] = map_data[:, 0] # - 200

# Apply UKF
ukf_results, covariances = UKF(data)

# plot parameters
dot_size = 9
map_dot_size = 1
y_window_shift = 70
gps_color = 'hotpink'
# pcmf_color = 'lightblue'
pcmf_color = 'lightskyblue'
# ukf_color = 'steelblue'
ukf_color = 'navy'
map_color = 'orange'

# Plotting
fig, ax = plt.subplots(figsize=(10, 10))
ax.scatter(data['path_points_x'], data['path_points_y'], label='PCMF Path Points', color=pcmf_color, s=dot_size)
ax.plot(ukf_results[:, 0], ukf_results[:, 2], color=ukf_color, label="UKF Filtered Path Points")
ax.scatter(data['gps/local_fix_x'], data['gps/local_fix_y'], color=gps_color, label='GPS Path', s=dot_size)
ax.scatter(map_data[:, 0], map_data[:, 1], color=map_color, s=map_dot_size, label='Reference Map')
ax.set_xlabel("East [m]")
ax.set_ylabel("North [m]")
ax.set_aspect('equal')
ax.grid(True, linestyle='--' )
ax.axis('equal')
ax.set_ylim(ax.get_ylim()[0] - y_window_shift, ax.get_ylim()[1] - y_window_shift)

# Add inset of zoomed area at the determined location
axins = inset_axes(ax, width="50%", height="50%", loc='lower right')
axins.scatter(data['path_points_x'], data['path_points_y'], label='PCMF Path Points', color=pcmf_color, s=dot_size)
axins.plot(ukf_results[:, 0], ukf_results[:, 2], color=ukf_color, label="UKF Filtered Path Points")
axins.scatter(data['gps/local_fix_x'], data['gps/local_fix_y'], color=gps_color, label='GPS Path', s=dot_size)
padding = 5
x1, x2 = ukf_results[:, 0].min() - padding, ukf_results[:, 0].max() + padding
y1, y2 = ukf_results[:, 2].min() - padding, ukf_results[:, 2].max() + padding
axins.set_aspect('equal', adjustable='box')
axins.set_xlim(x1, x2)
axins.set_ylim(y1, y2)

# Plot covariance ellipses every N=10 steps
for i in range(0, len(ukf_results), 5):
    plot_covariance_ellipse(axins, ukf_results[i], covariances[i], edgecolor='grey', facecolor='none', linestyle='--', alpha=0.5)
# for i in range(0, len(ukf_results), 10000):
#     plot_covariance_ellipse(ax, ukf_results[i], covariances[i], edgecolor='grey', facecolor='none', linestyle='--', alpha=0.5)


ax.legend()
plt.show()



**Error Evaluation**

In [None]:
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import numpy as np

# Calculate the individual X and Y errors as well as Euclidean for GPS vs PCMF and GPS vs UKF
error_x_gps_pcmf = np.abs(data['gps/local_fix_x'] - data['path_points_x'])
error_y_gps_pcmf = np.abs(data['gps/local_fix_y'] - data['path_points_y'])
euclidean_gps_pcmf = np.sqrt(error_x_gps_pcmf**2 + error_y_gps_pcmf**2)

error_x_gps_ukf = np.abs(data['gps/local_fix_x'] - ukf_results[:, 0])
error_y_gps_ukf = np.abs(data['gps/local_fix_y'] - ukf_results[:, 2])
euclidean_gps_ukf = np.sqrt(error_x_gps_ukf**2 + error_y_gps_ukf**2)

# Define function to calculate the IQR range for zoom limits
def iqr_limits(data):
    q1 = np.percentile(data, 25)
    q3 = np.percentile(data, 75)
    iqr = q3 - q1
    return q1 - 1.5 * iqr, q3 + 1.5 * iqr

# Plotting the RMSE boxplots with IQR zoom
fig, axs = plt.subplots(2, 2, figsize=(12, 12))

# Full range plots
axs[0, 0].boxplot([error_x_gps_pcmf, error_y_gps_pcmf, euclidean_gps_pcmf], labels=['Error X', 'Error Y', 'Euclidean Error'])
axs[0, 0].set_title('Full Range GPS vs PCMF Points')
axs[0, 1].boxplot([error_x_gps_ukf, error_y_gps_ukf, euclidean_gps_ukf], labels=['Error X', 'Error Y', 'Euclidean Error'])
axs[0, 1].set_title('Full Range GPS vs UKF PCMF Points')
#set colors for the boxes and whiskers and caps according to the color of the points

# IQR zoomed plots
lower_pcmf, upper_pcmf = iqr_limits(euclidean_gps_pcmf)
lower_ukf, upper_ukf = iqr_limits(euclidean_gps_ukf)

axs[1, 0].boxplot([error_x_gps_pcmf, error_y_gps_pcmf, euclidean_gps_pcmf], labels=['Error X', 'Error Y', 'Euclidean Error'])
axs[1, 0].set_ylim(-1, upper_pcmf + 1)
axs[1, 0].set_title('IQR-range GPS vs PCMF Points')

axs[1, 1].boxplot([error_x_gps_ukf, error_y_gps_ukf, euclidean_gps_ukf], labels=['Error X', 'Error Y', 'Euclidean Error'])
axs[1, 1].set_ylim(-1, upper_ukf + 1)
axs[1, 1].set_title('IQR-range GPS vs UKF PCMF Points')

for ax in axs.flat:
    ax.set_ylabel('Error (meters)')

# plt.suptitle('Error Boxplots for GPS vs PCMF and GPS vs UKF PCMF Points')
# Adjust layout to prevent overlapping title
plt.tight_layout(rect=[0, 0, 1, 0.97])
plt.show()

# Calculate RMSE for X and Y directions and combined Euclidean distance for GPS vs PCMF and GPS vs UKF
rmse_x_pcmf = np.sqrt(mean_squared_error(data['gps/local_fix_x'], data['path_points_x']))
rmse_y_pcmf = np.sqrt(mean_squared_error(data['gps/local_fix_y'], data['path_points_y']))
rmse_euclidean_pcmf = np.sqrt(mean_squared_error(data['gps/local_fix_x'], data['path_points_x']) + mean_squared_error(data['gps/local_fix_y'], data['path_points_y']))

rmse_x_ukf = np.sqrt(mean_squared_error(data['gps/local_fix_x'], ukf_results[:, 0]))
rmse_y_ukf = np.sqrt(mean_squared_error(data['gps/local_fix_y'], ukf_results[:, 2]))
rmse_euclidean_ukf = np.sqrt(mean_squared_error(data['gps/local_fix_x'], ukf_results[:, 0]) + mean_squared_error(data['gps/local_fix_y'], ukf_results[:, 2]))

print(f"RMSE X GPS vs PCMF: {rmse_x_pcmf:.2f} meters")
print(f"RMSE Y GPS vs PCMF: {rmse_y_pcmf:.2f} meters")
print(f"RMSE Euclidean GPS vs PCMF: {rmse_euclidean_pcmf:.2f} meters\n")

print(f"RMSE X GPS vs UKF: {rmse_x_ukf:.2f} meters")
print(f"RMSE Y GPS vs UKF: {rmse_y_ukf:.2f} meters")
print(f"RMSE Euclidean GPS vs UKF: {rmse_euclidean_ukf:.2f} meters")



In [None]:
gps_path = np.array([data['gps/local_fix_x'], data['gps/local_fix_y']]).T
pcmf_path = np.array([data['path_points_x'], data['path_points_y']]).T
ukf_path = np.array([ukf_results[:, 0], ukf_results[:, 2]]).T
camera_path = np.array([data['camera_trace_x'], data['camera_trace_y']]).T

# shift gps points 200 m left for a while
# gps_path[100:120, 0] = gps_path[100:120, 0] - 50 # brattora test 1
# gps_path[50:60, 0] = gps_path[50:60, 0] - 50 # eval_data

# scatter those points randomly within 10 meters in x and y
# gps_path[100:120, 0] = gps_path[100:120, 0] + np.random.uniform(-10, 10, 20)
# gps_path[100:120, 1] = gps_path[100:120, 1] + np.random.uniform(-10, 10, 20)

def state_transition(x, dt):
    F = np.array([[1, 0, dt, 0],
                  [0, 1, 0, dt],
                  [0, 0, 1, 0],
                  [0, 0, 0, 1]])
    return np.dot(F, x)

def measurement_function(x):
    return x[:2]  # Only position is measured

def innovation_check(innovation, threshold):
    return np.linalg.norm(innovation) > threshold

def fuse_paths(gps_path, kf_path, threshold=30):
    dt = 1  # Time step
    points = MerweScaledSigmaPoints(n=4, alpha=0.1, beta=2., kappa=0)
    ukf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, fx=state_transition, hx=measurement_function, points=points)
    ukf.Q = np.eye(4) * 0.1  # Process noise
    ukf.P = np.eye(4) * 10   # Initial state covariance
    ukf.x = np.array([gps_path[0, 0], gps_path[0, 1], 0, 0])  # Initial state from first GPS point

    fused_path = []

    for i, (gps, kf) in enumerate(zip(gps_path, kf_path)):
        # Predict state
        ukf.predict()

        # Update state with GPS measurement
        innovation_gps = gps - ukf.hx(ukf.x)
        if innovation_check(innovation_gps, threshold):
            # If innovation is large, increase the measurement noise covariance
            ukf.R = np.eye(2) * 10000 # Large measurement noise
        else:
            ukf.R = np.eye(2) * 1 # Normal measurement noise
        ukf.update(gps)

        # Predict state again before another measurement update
        ukf.predict()
        
        # Update state with KF measurement
        innovation_kf = kf - ukf.hx(ukf.x)
        if innovation_check(innovation_kf, threshold):
            ukf.R = np.eye(2) * 10000
        else:
            ukf.R = np.eye(2) * 10
        ukf.update(kf)

        fused_path.append(ukf.x[:2].copy())

    return np.array(fused_path)

fused_path = fuse_paths(gps_path, ukf_path)

# Plotting the fused path
fig, ax = plt.subplots(figsize=(10, 10))
# ax.scatter(gps_path[:, 0], gps_path[:, 1], label='GPS Path', color=gps_color, s=5)
ax.plot(gps_path[:, 0], gps_path[:, 1], color=gps_color, label='GPS Path', linewidth=2)
ax.scatter(pcmf_path[:, 0], pcmf_path[:, 1], label='PCMF Path Points', color=pcmf_color, s=10)
ax.plot(ukf_results[:, 0], ukf_results[:, 2], color=ukf_color, label="UKF Filtered Path Points")
ax.plot(fused_path[:, 0], fused_path[:, 1], color='limegreen', label="Fused Path", linewidth=2)
# ax.plot(camera_path[:, 0], camera_path[:, 1], color='red', label="Camera Path", linewidth=2)
# ax.scatter(map_data[:, 0], map_data[:, 1], color='orange', s=map_dot_size, label='Reference Map')
ax.set_xlabel("East [m]")
ax.set_ylabel("North [m]")
ax.set_aspect('equal')
ax.grid(True, linestyle='--' )
ax.set_xlim(gps_path[:, 0].min() - 10, gps_path[:, 0].max() + 10)
ax.set_ylim(gps_path[:, 1].min() - 10, gps_path[:, 1].max() + 10)
# ax.axis('equal')
ax.legend()
plt.show()

# print("elapsed time: ", data['timestamp'].iloc[-1], " seconds")

**Comparison Plots**

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.feature_selection import mutual_info_regression

def comparison_plots(directory):
    # List all CSV files in the specified directory
    csv_files = [f for f in os.listdir(directory) if f.endswith('.csv')]

    # boxplot errors for each file in the same plot
    fig, ax = plt.subplots(figsize=(12, 12))
    for idx, file in enumerate(csv_files):
        file_path = os.path.join(directory, file)
        temp_data = pd.read_csv(file_path)
        x_data = temp_data[['gps/local_fix_x', 'path_points_x']].dropna()
        y_data = temp_data[['gps/local_fix_y', 'path_points_y']].dropna()
        errors = np.sqrt((x_data['gps/local_fix_x'] - x_data['path_points_x'])**2 + (y_data['gps/local_fix_y'] - y_data['path_points_y'])**2)
        lower, upper = iqr_limits(errors)
        ax.boxplot(errors, patch_artist=True, positions=[idx], widths=0.6, boxprops=dict(facecolor='none'))
        # ax.set_xticks(range(len(csv_files)))
        ax.set_xticklabels("Test " + str(i) for i in range(1, len(csv_files) + 1))
        # ax.set_xticklabels((str(i) for i in csv_files), rotation=45, ha='right')
        # ax.set_title('Euclidean Errors for PCMF Path of All Tests')
        # ax.set_ylim(-5, 110)
        ax.set_ylabel('Euclidean Error [m]')
        # jitter = np.random.uniform(-0.3, 0.3, len(errors))
        # ax.scatter(np.full(len(errors), idx) + jitter, errors, alpha=0.5)
    plt.suptitle('a)', fontsize=20, x=0.5, y=0.05)
    plt.tight_layout(rect=[0, 0.05, 1, 1])
    plt.show()
    
    # run and boxplot UKF errors for each file
    fig, ax = plt.subplots(figsize=(12, 12))

    for idx, file in enumerate(csv_files):
        file_path = os.path.join(directory, file)
        temp_data = pd.read_csv(file_path)
        ukf_results, covariances = UKF(temp_data)
        errors = np.sqrt((temp_data['gps/local_fix_x'] - ukf_results[:, 0])**2 + (temp_data['gps/local_fix_y'] - ukf_results[:, 2])**2)
        lower, upper = iqr_limits(errors)
        ax.boxplot(errors, patch_artist=True, positions=[idx], widths=0.6, boxprops=dict(facecolor='none'))
        ax.set_xticks(range(len(csv_files)))
        ax.set_xticklabels(("Test " + str(i) for i in range(1, len(csv_files) + 1)))
        ax.set_title('Euclidean Errors for UKF Path of All Tests')
        ax.set_ylabel('Euclidean Error [m]')
        # jitter = np.random.uniform(-0.3, 0.3, len(errors))
        # ax.scatter(np.full(len(errors), idx) + jitter, errors, alpha=0.5)

    plt.suptitle('b)', fontsize=20, x=0.5, y=0.05)
    plt.tight_layout(rect=[0, 0.05, 1, 1])
    plt.show()

    # bar plot of RMSE for each test for PCMF and UKF
    fig, ax = plt.subplots(figsize=(8, 4))
    rmse_pcmf = []
    rmse_ukf = []

    for idx, file in enumerate(csv_files):
        file_path = os.path.join(directory, file)
        temp_data = pd.read_csv(file_path)
        ukf_results, covariances = UKF(temp_data)
        errors_pcmf = np.sqrt((temp_data['gps/local_fix_x'] - temp_data['path_points_x'])**2 + (temp_data['gps/local_fix_y'] - temp_data['path_points_y'])**2)
        errors_ukf = np.sqrt((temp_data['gps/local_fix_x'] - ukf_results[:, 0])**2 + (temp_data['gps/local_fix_y'] - ukf_results[:, 2])**2)
        rmse_pcmf.append(np.sqrt(np.mean(errors_pcmf**2)))
        rmse_ukf.append(np.sqrt(np.mean(errors_ukf**2)))
        # print RMSE for each test
        print(f"Test {idx + 1} \n  RMSE PCMF: {rmse_pcmf[-1]:.2f} meters")
        print(f"  RMSE UKF: {rmse_ukf[-1]:.2f} meters")


    ax.bar(np.arange(len(csv_files)) - 0.2, rmse_pcmf, width=0.4, label='PCMF Path', color=pcmf_color)
    ax.bar(np.arange(len(csv_files)) + 0.2, rmse_ukf, width=0.4, label='UKF Path', color=ukf_color)
    ax.set_xticks(range(len(csv_files)))
    ax.set_xticklabels(("Test " + str(i) for i in range(1, len(csv_files) + 1)))
    # ax.set_title('RMSE for PCMF and UKF Paths of All Tests')
    ax.set_ylabel('Euclidean RMSE [m]')
    ax.legend()
    plt.show()

# Specify the directory containing the CSV files, aka this directory
directory = '.'
comparison_plots(directory)


**Sensor Fusion with Fiducial Markers**

In [None]:
# reload to avoid accumulating data
gps_path = np.array([data['gps/local_fix_x'], data['gps/local_fix_y']]).T
pcmf_path = np.array([data['path_points_x'], data['path_points_y']]).T
ukf_path = np.array([ukf_results[:, 0], ukf_results[:, 2]]).T
camera_path = np.array([data['camera_trace_x'], data['camera_trace_y']]).T

# State transition and measurement functions as previously defined
def state_transition(x, dt):
    F = np.array([[1, 0, dt, 0],
                  [0, 1, 0, dt],
                  [0, 0, 1, 0],
                  [0, 0, 0, 1]])
    return np.dot(F, x)

def measurement_function(x):
    return x[:2]

def innovation_check(innovation, threshold):
    return np.linalg.norm(innovation) > threshold

def fuse_paths(gps_path, pcmf_path, camera_path, threshold=30):
    dt = 1  # Time step
    points = MerweScaledSigmaPoints(n=4, alpha=0.1, beta=2., kappa=0)
    ukf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, fx=state_transition, hx=measurement_function, points=points)
    ukf.Q = np.eye(4) * 0.1  # Process noise 
    ukf.P = np.eye(4) * 10   # Initial state covariance
    ukf.x = np.array([gps_path[0, 0], gps_path[0, 1], 0, 0])  # Initial state from first GPS point

    fused_path = []

    for i, (gps, kf, cam) in enumerate(zip(gps_path, pcmf_path, camera_path)):
        ukf.predict()  # Predict state

        # Update state with GPS measurement if not NaN
        if not np.isnan(gps).any():
            innovation_gps = gps - ukf.hx(ukf.x)
            if innovation_check(innovation_gps, threshold):
                ukf.R = np.eye(2) * 10000  # Large measurement noise
            else:
                ukf.R = np.eye(2) * 1  # Normal measurement noise
            ukf.update(gps)

        # Update state with KF measurement if not NaN
        if not np.isnan(kf).any():
            ukf.predict()  # Predict state again before another measurement update
            innovation_kf = kf - ukf.hx(ukf.x)
            if innovation_check(innovation_kf, threshold):
                ukf.R = np.eye(2) * 10000
            else:
                ukf.R = np.eye(2) * 10
            ukf.update(kf)

        # Update state with camera measurement if not NaN
        if not np.isnan(cam).any():
            ukf.predict()  # Predict state again before another measurement update
            innovation_cam = cam - ukf.hx(ukf.x)
            if innovation_check(innovation_cam, threshold):
                ukf.R = np.eye(2) * 10000
            else:
                ukf.R = np.eye(2) * 5  # Slightly more trust than KF
            ukf.update(cam)

        fused_path.append(ukf.x[:2].copy())

    return np.array(fused_path)

# jamming the gps path for testing
# gps_path[50:70, 0] = gps_path[50:70, 0] - 50 # eval_data

# remove all ukf_results points after 50 for testing
# ukf_results = ukf_results[:50]

fused_path_cam = fuse_paths(gps_path, pcmf_path, camera_path)

# Plotting the fused path
fig, ax = plt.subplots(figsize=(10, 10))
# ax.scatter(gps_path[:, 0], gps_path[:, 1], label='GPS Path', color=gps_color, s=5)
ax.plot(gps_path[:, 0], gps_path[:, 1], color=gps_color, label='GPS Path', linewidth=2)
ax.scatter(pcmf_path[:, 0], pcmf_path[:, 1], label='PCMF Path Points', color=pcmf_color, s=10)
# scatter plot camera path in red with black border
ax.scatter(camera_path[:, 0], camera_path[:, 1], label='Camera Path Points', color='white', s=15, edgecolors='black')

ax.plot(ukf_results[:, 0], ukf_results[:, 2], color=ukf_color, label="UKF Filtered Path Points")
ax.plot(fused_path_cam[:, 0], fused_path_cam[:, 1], color='lime', label="Fused Path with Camera", linewidth=2)
ax.plot(map_data[:, 0], map_data[:, 1], color=map_color, label='Reference Map')
ax.set_xlabel("East [m]")
ax.set_ylabel("North [m]")
ax.set_aspect('equal')
ax.grid(True, linestyle='--' )
ax.set_xlim(gps_path[:, 0].min() - 10, gps_path[:, 0].max() + 10)
ax.set_ylim(gps_path[:, 1].min() - 10, gps_path[:, 1].max() + 10)
# ax.set_xlim(20,50)
# ax.set_ylim(-180, -160)
# ax.axis('equal')
ax.legend()
plt.show()



In [None]:
# # box plot of errors, full range and zoomed in to IQR above each other, using orange color
# # find iqr range
# q1 = np.percentile(errors, 25)
# q3 = np.percentile(errors, 75)

# box_colors = ['orange']

# fig, ax = plt.subplots(2, 1, figsize=(6,4)) 
# ax[0].boxplot(errors, vert=False, patch_artist=True,
#                 boxprops=dict(facecolor=box_colors[0], alpha=0.7),
#                 medianprops=dict(color="black"),
#                 whiskerprops=dict(color=box_colors[0]),
#                 # capprops=dict(color=box_colors[0]),
#                 flierprops=dict(markeredgecolor=box_colors[0], marker='o'))
# ax[0].set_title('Euclidean Error GNSS vs PCMF path')
# ax[0].set_xlabel('Error')
# # ax[0].grid(True)

# ax[1].boxplot(errors, vert=False, patch_artist=True,
#                 boxprops=dict(facecolor=box_colors[0], alpha=0.7),
#                 medianprops=dict(color="black"),
#                 whiskerprops=dict(color=box_colors[0]),
#                 # capprops=dict(color=box_colors[0]),
#                 flierprops=dict(markeredgecolor=box_colors[0], marker='o'))
# ax[1].set_title('IQR Euclidean Error GNSS vs PCMF path')
# ax[1].set_xlabel('Error')
# # ax[1].grid(True)
# ax[1].set_xlim(q1 - 1.5*(q3-q1), q3 + 1.5*(q3-q1))

# plt.tight_layout(h_pad=1.0, pad=0.0, w_pad=0.0)
# plt.show()



In [None]:
# # histogram of errors
# plt.hist(errors, bins=20, density=True, alpha=0.6, color='g')
# mu, std = norm.fit(errors)
# xmin, xmax = plt.xlim()
# x = np.linspace(xmin, xmax, 100)
# p = norm.pdf(x, mu, std)
# plt.plot(x, p, 'k', linewidth=2)
# plt.title("Histogram of errors")
# plt.show()

In [None]:
# # Q-Q plot
# import scipy.stats as stats
# import matplotlib.pyplot as plt

# stats.probplot(errors, dist="norm", plot=plt)
# plt.title("Normal Q-Q plot")
# plt.show()

**Transformation Plotter**

In [None]:
# # Function to calculate the 80% confidence interval for normal distribution
# def confidence_interval(data, confidence=0.8):
#     mean, sigma = np.mean(data), np.std(data)
#     h = sigma * norm.ppf((1 + confidence) / 2)
#     return mean - h, mean + h

# # Set up the subplots
# fig, ax = plt.subplots(3, 2, figsize=(18, 15))

# # Plots for transformations over time
# ax[0, 0].plot(data['timestamp'].to_numpy(), data['transformation_history_x'].to_numpy(), label='X without EKF', color='orange')
# ax[0, 0].plot(data['timestamp'].to_numpy(), data['EKF_transformation_history_x'].to_numpy(), label='X with EKF', color='green')
# ax[1, 0].plot(data['timestamp'].to_numpy(), data['transformation_history_y'].to_numpy(), label='Y without EKF', color='orange')
# ax[1, 0].plot(data['timestamp'].to_numpy(), data['EKF_transformation_history_y'].to_numpy(), label='Y with EKF', color='green')
# ax[2, 0].plot(data['timestamp'].to_numpy(), data['transformation_history_rotation'].to_numpy(), label='Theta without EKF', color='orange')
# ax[2, 0].plot(data['timestamp'].to_numpy(), data['EKF_transformation_history_rotation'].to_numpy(), label='Theta with EKF', color='green')

# # Set labels and titles for left side plots
# for i in range(3):
#     ax[i, 0].set_xlabel('Timestamp')
#     ax[i, 0].set_ylabel(['X Position', 'Y Position', 'Theta'][i])
#     ax[i, 0].legend()
#     ax[i, 0].grid(True)

# # Plots for transformation distributions
# for i, col in enumerate(['transformation_history_x', 'transformation_history_y', 'transformation_history_rotation']):
#     data_col = data[col].dropna().to_numpy()
#     ekf_col = data[f'EKF_{col}'].dropna().to_numpy()

#     # Calculate normal distribution fit
#     mu, std = norm.fit(data_col)
#     mu_ekf, std_ekf = norm.fit(ekf_col)

#     # Create a dense range of x values for plotting the PDF
#     xmin, xmax = min(data_col.min(), ekf_col.min()), max(data_col.max(), ekf_col.max())
#     x = np.linspace(xmin, xmax, 1000)
#     p = norm.pdf(x, mu, std)
#     p_ekf = norm.pdf(x, mu_ekf, std_ekf)

#     # Manually normalize the PDF so that the integral is exactly 1
#     p /= np.trapz(p, x)
#     p_ekf /= np.trapz(p_ekf, x)

#     # Plot the distributions
#     ax[i, 1].plot(x, p, label=f'{col} Normalized', color='orange')
#     ax[i, 1].plot(x, p_ekf, label=f'EKF_{col} Normalized', color='green')

#     # Plotting the 80% confidence interval shading
#     low, high = confidence_interval(data_col, confidence=0.95)
#     low_ekf, high_ekf = confidence_interval(ekf_col, confidence=0.95)
#     ax[i, 1].fill_between(x, p, where=(x > low) & (x < high), color='orange', alpha=0.2)
#     ax[i, 1].fill_between(x, p_ekf, where=(x > low_ekf) & (x < high_ekf), color='green', alpha=0.2)

#     # Set labels and grid
#     ax[i, 1].set_xlabel(['X Position', 'Y Position', 'Theta'][i])
#     ax[i, 1].set_ylabel('Probability Density')
#     ax[i, 1].legend()
#     ax[i, 1].grid(True)

# fig.suptitle('Transformations with Normal Distribution Approximation', fontsize=16)
# plt.subplots_adjust(top=0.95)

# # Show the plot
# plt.show()


In [None]:
# import numpy as np
# import matplotlib.pyplot as plt
# from scipy.stats import gaussian_kde
# from sklearn.utils import resample

# # Function to calculate the confidence interval using the bootstrap method
# def bootstrap_confidence_interval(data, confidence=0.8, n_iterations=1000):
#     bootstrap_samples = np.array([np.mean(resample(data)) for _ in range(n_iterations)])
#     alpha = 1 - confidence
#     lower_percentile = 100 * alpha / 2
#     upper_percentile = 100 * (1 - alpha / 2)
#     lower_bound = np.percentile(bootstrap_samples, lower_percentile)
#     upper_bound = np.percentile(bootstrap_samples, upper_percentile)
#     return lower_bound, upper_bound

# # Set up the subplots
# fig, ax = plt.subplots(3, 2, figsize=(18, 15))

# # Plots for transformations over time
# ax[0, 0].plot(data['timestamp'].to_numpy(), data['transformation_history_x'].to_numpy(), label='X without EKF', color='orange')
# ax[0, 0].plot(data['timestamp'].to_numpy(), data['EKF_transformation_history_x'].to_numpy(), label='X with EKF', color='green')
# ax[1, 0].plot(data['timestamp'].to_numpy(), data['transformation_history_y'].to_numpy(), label='Y without EKF', color='orange')
# ax[1, 0].plot(data['timestamp'].to_numpy(), data['EKF_transformation_history_y'].to_numpy(), label='Y with EKF', color='green')
# ax[2, 0].plot(data['timestamp'].to_numpy(), data['transformation_history_rotation'].to_numpy(), label='Theta without EKF', color='orange')
# ax[2, 0].plot(data['timestamp'].to_numpy(), data['EKF_transformation_history_rotation'].to_numpy(), label='Theta with EKF', color='green')

# # Set labels and titles for left side plots
# for i in range(3):
#     ax[i, 0].set_xlabel('Timestamp')
#     ax[i, 0].set_ylabel(['X Position', 'Y Position', 'Theta'][i])
#     ax[i, 0].legend()
#     ax[i, 0].grid(True)

# # Plots for transformation distributions
# for i, col in enumerate(['transformation_history_x', 'transformation_history_y', 'transformation_history_rotation']):
#     data_col = data[col].dropna().to_numpy()
#     ekf_col = data[f'EKF_{col}'].dropna().to_numpy()

#     # Create a dense range of x values for plotting the PDF
#     xmin, xmax = min(data_col.min(), ekf_col.min()), max(data_col.max(), ekf_col.max())
#     x = np.linspace(xmin, xmax, 1000)

#     # Use kernel density estimation for a smooth estimate of the PDF
#     kde = gaussian_kde(data_col)
#     kde_ekf = gaussian_kde(ekf_col)
#     p = kde(x)
#     p_ekf = kde_ekf(x)

#     # Plot the distributions
#     ax[i, 1].plot(x, p, label=f'{col} Density Estimate', color='orange')
#     ax[i, 1].plot(x, p_ekf, label=f'EKF_{col} Density Estimate', color='green')

#     # Plotting the confidence interval shading using bootstrap
#     low, high = bootstrap_confidence_interval(data_col, confidence=0.95)
#     low_ekf, high_ekf = bootstrap_confidence_interval(ekf_col, confidence=0.95)
#     ax[i, 1].fill_between(x, p, where=(x > low) & (x < high), color='orange', alpha=0.2)
#     ax[i, 1].fill_between(x, p_ekf, where=(x > low_ekf) & (x < high_ekf), color='green', alpha=0.2)

#     # Set labels and grid
#     ax[i, 1].set_xlabel(['X Position', 'Y Position', 'Theta'][i])
#     ax[i, 1].set_ylabel('Probability Density')
#     ax[i, 1].legend()
#     ax[i, 1].grid(True)

# fig.suptitle('Transformation Density Estimate with Bootstrap Confidence Interval', fontsize=16)
# plt.subplots_adjust(top=0.95)
# plt.show()


In [None]:


# from scipy.stats import gaussian_kde

# # Set up the subplots
# fig, ax = plt.subplots(3, 2, figsize=(18, 15))

# # Plots for transformations over time
# ax[0, 0].plot(data['timestamp'].to_numpy(), data['transformation_history_x'].to_numpy(), label='X without EKF', color='orange')
# ax[0, 0].plot(data['timestamp'].to_numpy(), data['EKF_transformation_history_x'].to_numpy(), label='X with EKF', color='green')
# ax[1, 0].plot(data['timestamp'].to_numpy(), data['transformation_history_y'].to_numpy(), label='Y without EKF', color='orange')
# ax[1, 0].plot(data['timestamp'].to_numpy(), data['EKF_transformation_history_y'].to_numpy(), label='Y with EKF', color='green')
# ax[2, 0].plot(data['timestamp'].to_numpy(), data['transformation_history_rotation'].to_numpy(), label='Theta without EKF', color='orange')
# ax[2, 0].plot(data['timestamp'].to_numpy(), data['EKF_transformation_history_rotation'].to_numpy(), label='Theta with EKF', color='green')

# # Set labels and titles for left side plots
# for i in range(3):
#     ax[i, 0].set_xlabel('Timestamp')
#     ax[i, 0].set_ylabel(['X Position', 'Y Position', 'Theta'][i])
#     ax[i, 0].legend()
#     ax[i, 0].grid(True)

# # Plots for transformation distributions using box plots
# for i, col in enumerate(['transformation_history_x', 'transformation_history_y', 'transformation_history_rotation']):
#     data_col = data[col].dropna().to_numpy()
#     ekf_col = data[f'EKF_{col}'].dropna().to_numpy()

#     # Define box properties for each dataset
#     box_colors = ['orange', 'green']  # Colors for the boxes

#     # Plot the box plots for each dataset separately
#     box1 = ax[i, 1].boxplot(data_col, positions=[1], widths=0.6, patch_artist=True,
#                             boxprops=dict(facecolor=box_colors[0]),
#                             medianprops=dict(color="black"),
#                             whiskerprops=dict(color=box_colors[0]),
#                             capprops=dict(color=box_colors[0]),
#                             flierprops=dict(markeredgecolor=box_colors[0], marker='o'))
    
#     box2 = ax[i, 1].boxplot(ekf_col, positions=[2], widths=0.6, patch_artist=True,
#                             boxprops=dict(facecolor=box_colors[1]),
#                             medianprops=dict(color="black"),
#                             whiskerprops=dict(color=box_colors[1]),
#                             capprops=dict(color=box_colors[1]),
#                             flierprops=dict(markeredgecolor=box_colors[1], marker='o'))

#     # Set labels and titles for right side plots
#     ax[i, 1].set_xticks([1, 2])
#     ax[i, 1].set_xticklabels(['Without EKF', 'With EKF'])
#     ax[i, 1].set_title(f'Distribution of {col}')
#     ax[i, 1].grid(True)

# # Adjust layout and display the plot
# fig.suptitle('Transformation History and Distribution Analysis', fontsize=16)
# plt.subplots_adjust(top=0.95)
# plt.show()
