Test with GPS only 

In [None]:
import numpy as np
import struct 
import os
import plotly.graph_objects as go
import time
from tqdm import tqdm

import symforce
try:
    symforce.set_epsilon_to_symbol()
except symforce.AlreadyUsedEpsilon:
    print("Already set symforce epsilon")
    pass 

import symforce.symbolic as sf

import chimera_fgo.util.general as general
from chimera_fgo.util.io import read_lidar_bin, read_gt
from chimera_fgo.util.kitti import process_kitti_gt, load_sv_positions
from chimera_fgo.util.geometry import euler_to_R

from chimera_fgo.symforce.fgo_gps_only import fgo_gps_only

%load_ext autoreload
%autoreload 2

np.set_printoptions(suppress=True, precision=3)

### Load data

In [None]:
kitti_seq = '0034'
start_idx = 0

gtpath = os.path.join(os.getcwd(), '..', '..', 'data', 'kitti', kitti_seq, 'oxts', 'data')
gt_enu, gt_Rs, gt_attitudes = process_kitti_gt(gtpath, start_idx=start_idx)

In [None]:
# Load satellite positions
sv_path = os.path.join(os.getcwd(), '..', '..', 'data', 'kitti', kitti_seq, 'svs')
#svfile = os.path.join(sv_path, f'{kitti_seq}_saved_sats.csv')
sv_positions = load_sv_positions(gtpath, sv_path, kitti_seq, start_idx=start_idx)

### Parameters

In [None]:
TRAJLEN = 2000

MAX_BIAS = 0  # [m]
ATTACK_START = TRAJLEN // 2

# Sliding window of size 100 which shifts by 10 each iteration
N_SHIFT = 10
N_WINDOW = 100

GPS_RATE = 10  # ratio of odometry to GPS measurements

PR_SIGMA = 6

N_SATS = len(sv_positions[0])

In [None]:
# Spoofed trajectory
spoofed_pos = gt_enu[:TRAJLEN].copy()

gps_spoofing_biases = np.zeros(TRAJLEN)  
gps_spoofing_biases[ATTACK_START:] = np.linspace(0, MAX_BIAS, TRAJLEN-ATTACK_START)  # Ramping attack

spoofed_pos[:,0] -= gps_spoofing_biases

# Spoofed ranges
PR_SIGMA = 6
spoofed_ranges = np.zeros((TRAJLEN, N_SATS))
for i in range(TRAJLEN):
    svs = sv_positions[i]
    for j in range(N_SATS):
        spoofed_ranges[i,j] = np.linalg.norm(spoofed_pos[i] - svs[j]) + np.random.normal(0, PR_SIGMA)

Single window

In [None]:
init_poses = [sf.Pose3.identity()] * N_WINDOW
window = slice(0, N_WINDOW)
ranges = spoofed_ranges[window]
satpos_enu = np.array(sv_positions[window])
satpos = [[sf.V3(satpos_enu[i][j]) for j in range(N_SATS)] for i in range(N_WINDOW)]
result = fgo_gps_only(init_poses, satpos, ranges, PR_SIGMA, GPS_RATE, fix_first_pose=False, debug=False)

In [None]:
window_positions = np.zeros((N_WINDOW//GPS_RATE, 3))
for i in range(N_WINDOW//GPS_RATE):
    pose = result.optimized_values["poses"][i*GPS_RATE]
    window_positions[i] = pose.position().flatten()   
window_positions

In [None]:
np.std(gt_enu[0:100:10] - window_positions, axis=0)

Sliding window

In [None]:
PR_SIGMA = 6

graph_positions = np.zeros((TRAJLEN, 3))
# graph_rotations = np.zeros((TRAJLEN, 3))
graph_attitudes = np.zeros((TRAJLEN, 3))
init_poses = [sf.Pose3.identity()] * N_WINDOW
init_poses[0] = sf.Pose3(sf.Rot3.from_rotation_matrix(gt_Rs[0]), sf.V3(gt_enu[0]))
e_ranges = np.zeros((N_WINDOW//GPS_RATE, N_SATS))
qs = np.zeros(TRAJLEN//GPS_RATE)

for k in (pbar := tqdm(range((TRAJLEN - N_WINDOW) // N_SHIFT))):

    window = slice(N_SHIFT * k, N_SHIFT * k + N_WINDOW)
    ranges = spoofed_ranges[window]
    satpos_enu = np.array(sv_positions[window])
    satpos = [[sf.V3(satpos_enu[i][j]) for j in range(N_SATS)] for i in range(N_WINDOW)]

    result = fgo_gps_only(init_poses, satpos, ranges, PR_SIGMA, GPS_RATE, fix_first_pose=True, debug=False)

    # Extract optimized positions
    window_positions = np.zeros((N_WINDOW, 3))
    window_attitudes = np.zeros((N_WINDOW, 3))
    for i in range(N_WINDOW):
        pose = result.optimized_values["poses"][i]
        window_positions[i] = pose.position().flatten()   
        window_attitudes[i] = pose.rotation().to_yaw_pitch_roll().flatten()   

    # Save trajectory
    graph_positions[N_SHIFT*k:N_SHIFT*(k+1)] = window_positions[:N_SHIFT]
    graph_attitudes[N_SHIFT*k:N_SHIFT*(k+1)] = window_attitudes[:N_SHIFT]

    # Update initial poses
    init_poses[:-N_SHIFT] = result.optimized_values["poses"][N_SHIFT:]

    # Use lidar odometry to initialize latest new poses
    n_pose = result.optimized_values["poses"][-1]
    n_pose = sf.Pose3(sf.Rot3.from_rotation_matrix(n_pose.R.to_rotation_matrix()), sf.V3(n_pose.t))
    for i in range(N_SHIFT):
        lidar_T = sf.Pose3(sf.Rot3.from_rotation_matrix(lidar_Rs[N_SHIFT*k + N_WINDOW + i]), sf.V3(lidar_ts[N_SHIFT*k + N_WINDOW + i]))
        n_pose = lidar_T * n_pose
        init_poses[-N_SHIFT+i] = n_pose

    # Compute test statistic
    for i in range(N_WINDOW//GPS_RATE):
        svs = sv_positions[N_SHIFT * k + i * GPS_RATE]
        for j in range(N_SATS):
            e_ranges[i,j] = np.linalg.norm(window_positions[::GPS_RATE][i] - svs[j])
    q = np.sum((ranges[::GPS_RATE] - e_ranges)**2) / PR_SIGMA**2
    pbar.set_description(f"q = {q:.2f}")
    qs[k] = q

    # Mitigation
    if q > T:
        print("Attack detected at ", N_SHIFT * k)
        PR_SIGMA = 1e5

# # End of trajectory, save all the positions from the current window
# for i in range(N_WINDOW):
#     idx = TRAJLEN - N_WINDOW + i
#     graph_positions[idx] = result.optimized_values["poses"][i].position().flatten()
OPT_TRAJLEN = N_SHIFT*(k+1)

In [None]:
# For plotting section of q
qs_orig = qs
qs = qs_orig[:133].copy()

In [None]:
OPT_TRAJLEN = N_SHIFT*(k+1)
graph_positions = graph_positions[:OPT_TRAJLEN]
gt_enu = gt_enu[:OPT_TRAJLEN]
qs = qs[:OPT_TRAJLEN//N_SHIFT]

In [None]:
# Plot test statistic and threshold
fig = go.Figure()
fig.add_trace(go.Scatter(x=np.arange(len(qs)), y=qs, name='Test statistic'))
fig.add_trace(go.Scatter(x=np.arange(len(qs)), y=T*np.ones(len(qs)), name='Threshold', line=dict(color='red', dash='dash')))
# Add vertical line at start of spoofing attack
fig.add_shape(type="line", x0=ATTACK_START/GPS_RATE, y0=-20, x1=ATTACK_START/GPS_RATE, y1=200, line=dict(color="black", width=2, dash="dash"))
fig.update_layout(width=900, height=500, xaxis_title='Time [s]', yaxis_title='Test statistic')
#fig.update_layout(legend=dict(x=0.75, y=0.98), font=dict(size=18))
fig.show()

In [None]:
fgo_traj = go.Scatter(x=graph_positions[:,0], y=graph_positions[:,1], hovertext=np.arange(TRAJLEN), name='FGO trajectory')
gt_traj = go.Scatter(x=gt_enu[:TRAJLEN,0], y=gt_enu[:TRAJLEN,1], hovertext=np.arange(TRAJLEN), name='Ground-truth')
spoof_traj = go.Scatter(x=spoofed_pos[:OPT_TRAJLEN,0], y=spoofed_pos[:OPT_TRAJLEN,1], hovertext=np.arange(OPT_TRAJLEN), name='Spoofed')
start = go.Scatter(x=[0], y=[0], name='Start', mode='markers', marker=dict(size=10, color='blue'), showlegend=False)
fig = go.Figure(data=[gt_traj, fgo_traj, spoof_traj, start])
fig.update_layout(width=700, height=700, xaxis_title='East [m]', yaxis_title='North [m]')
# Move legend into plot
# fig.update_layout(legend=dict(x=0.75, y=0.98), font=dict(size=18))
fig.update_yaxes(
    scaleanchor = "x",
    scaleratio = 1,
  )
fig.update_xaxes(autorange=True)
fig.show()

In [None]:
# Plot x and y errors over time
fig = go.Figure()
fig.add_trace(go.Scatter(x=np.arange(TRAJLEN), y=graph_positions[:TRAJLEN,0] - gt_enu[:TRAJLEN,0], name='x error'))
fig.add_trace(go.Scatter(x=np.arange(TRAJLEN), y=graph_positions[:TRAJLEN,1] - gt_enu[:TRAJLEN,1], name='y error'))
fig.add_trace(go.Scatter(x=np.arange(TRAJLEN), y=graph_positions[:TRAJLEN,2] - gt_enu[:TRAJLEN,2], name='z error'))
# Plot RMSE
fig.add_trace(go.Scatter(x=np.arange(TRAJLEN), y=np.sqrt(np.mean((graph_positions - gt_enu[:TRAJLEN])**2, axis=1)), name='RMSE'))
fig.update_layout(width=900, height=500, xaxis_title='Time [s]', yaxis_title='Error [m]')
fig.update_layout(legend=dict(x=0.75, y=0.98), font=dict(size=18))
fig.show()


In [None]:
# RMSE error
rmse_xyz = np.sqrt(np.mean((graph_positions[:1800] - gt_enu[:1800])**2, axis=0))
print("RMSE (xyz): ", rmse_xyz)
print("RMSE (overall): ", rmse_xyz.mean())

In [None]:
# mean error
np.mean((graph_positions[:1800] - gt_enu[:1800]), axis=0)