# MAGYC Example - Simulated Data Calibration & Self Evaluation

This notebook demonstrates how to use the MAGYC algorithms to calibrate a magnetometer and gyroscope using simulated data and provides a comparison with benchmark methods for calibration. Then, the results are self evaluated.

The calibration dataset corresponds to the simulated data used in the paper: "Full Magnetometer and Gyroscope Bias Estimation Using Angular Rates: Theory and Experimental Evaluation of a Factor Graph-Based Approach" by S. Rodríguez-Martínez and G. Troni, 2024. The dataset is available on Google Drive.

## Simulated dataset description

1. Movement levels:
    * **WAM**: Wide angular movement dataset (roll: $\pm 5^\circ$, pitch: $\pm 45^\circ$ and heading $\pm 360^\circ$).
    * **MAM**: Mid angular movement dataset (roll: $\pm 5^\circ$, pitch: $\pm 5^\circ$ and heading $\pm 360^\circ$).
    * **LAM**: Low angular movement dataset (roll: $\pm 5^\circ$, pitch: $\pm 45^\circ$ and heading $\pm 90^\circ$).
    * **TAM**: Tiny angular movement dataset (roll: $\pm 5^\circ$, pitch: $\pm 5^\circ$ and heading $\pm 90^\circ$).

2. Simulations number: 1,000 per movement level.
3. Samples per simulation: 6,000 (600 seconds at 10 Hz).
4. Magnetometer noise: 10.0 mG.
5. Gyroscope noise: 10 mrad/s.
6. The true magnetic field vector: $\mathbf{m_0} = [227,\, 52, \,412]^T$ mG.
7. Soft-iron upper triangular terms are given by: $\mathbf{a} = [1.10,\, 0.10,\, 0.04,\, 0.88,\, 0.02,\, 1.22]^T$.
8. Hard-iron bias is: $\mathbf{m_b} = [20,\, 120,\, 90]^T$ mG.
9. Gyroscope bias is: $\mathbf{w_b} = [4,\, -5,\, 2]^T$ mrad/s.

> For further details, refer to section _V. Numerical Simulation Evaluation_ in the paper.

## Parameters

In [None]:
# Dataset options for calibratio: wam, mam, lam, tam
dataset = "lam"

# Number of cpus for multiprocessing. If 0, it will use all available cpus
processors = 0

# Number of simulations to use for calibration. If all, set to -1
idx = 3

## Set dependencies

In [None]:
import os
import pickle as pkl
import sys
from multiprocessing import Pool, cpu_count
from warnings import warn

import gdown
import matplotlib.pyplot as plt
import navlib.math as nm
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
from tqdm import tqdm

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), os.pardir, "magyc")))

from magyc import hsi_calibration_validation, magfield_data_plot

## Import Data from Google Drive

In [None]:
# Paths
cwd_path = os.path.abspath(os.getcwd())
sim_data_path = os.path.join(os.path.abspath(os.getcwd()), "sim_magcal_20241107_2049.pkl")

if not os.path.isfile(sim_data_path):
    # Google Drive file id
    file_id = "1c5Y1y3PU0pYVrRuZwQGtYYlmj3X2twRm"
    url = f"https://drive.google.com/uc?id={file_id}"

    # Download the file
    gdown.download(url, sim_data_path)

## Load Dataset

In [None]:
# Read pickle
with open(sim_data_path, 'rb') as f:
    data = pkl.load(f)

dataset_map = {"wam":  "high", "mam": "mid", "lam": "low", "tam": "tiny", "cross": "cross"}

data = data[dataset_map[dataset]]

## Dataset Calibration

In [None]:
output_path_hi = f"{cwd_path}/calibration_hi_{dataset}.pkl"
output_path_si = f"{cwd_path}/calibration_si_{dataset}.pkl"
output_path_wb = f"{cwd_path}/calibration_wb_{dataset}.pkl"

si_dict = dict()
hi_dict = dict()
wb_dict = dict()

## Get Calibration Data

In [None]:
# Extract data
# Time
time = data["t"][:idx]

# Magnetic field
magnetic_field = data["mmt"][:idx]
magnetic_field_f = magnetic_field
local_magnetic_field = data["m0"][0].flatten()
for i in range(magnetic_field.shape[0]):
    magnetic_field_f[i] = savgol_filter(magnetic_field[i], 50, 2, axis=0)

# Angular rates
angular_rates = data["wmt"][:idx]
angular_rates_f = angular_rates
for i in range(angular_rates.shape[0]):
    angular_rates_f[i] = savgol_filter(angular_rates[i], 50, 2, axis=0)

# Attitude
rph = data["rph"][:idx]

# Frequency
frequency = int(np.round(nm.mean(1 / np.diff(time[0].squeeze())), 0))

In [None]:
# Plot magnetic field components and magnitude
_, ax = plt.subplots(4, 1)

ax[0].set_title("Magnetic field X")
ax[0].plot(time[0], magnetic_field[0, :, 0], label="raw")
ax[0].plot(time[0], magnetic_field_f[0, :, 0], label="filtered")
ax[0].legend()

ax[1].set_title("Magnetic field Y")
ax[1].plot(time[0], magnetic_field[0, :, 1], label="raw")
ax[1].plot(time[0], magnetic_field_f[0, :, 1], label="filtered")
ax[1].legend()

ax[2].set_title("Magnetic field Z")
ax[2].plot(time[0], magnetic_field[0, :, 2], label="raw")
ax[2].plot(time[0], magnetic_field_f[0, :, 2], label="filtered")
ax[2].legend()

magnetic_field_magnitude = nm.norm(magnetic_field[0])
magnetic_field_f_magnitude = nm.norm(magnetic_field_f[0])

ax[3].set_title("Magnetic field magnitude")
ax[3].plot(time[0], magnetic_field_magnitude, label="raw")
ax[3].plot(time[0], magnetic_field_f_magnitude, label="filtered")
ax[3].legend()

plt.show()

In [None]:
_, ax = plt.subplots(3, 1)

ax[0].set_title("Heading")
ax[0].plot(time[0], np.rad2deg(rph[0, :, 2]))
ax[0].set_ylabel("Degrees")

ax[1].set_title("Roll")
ax[1].plot(time[0], np.rad2deg(rph[0, :, 0]))
ax[1].set_ylabel("Degrees")

ax[2].set_title("Pitch")
ax[2].plot(time[0], np.rad2deg(rph[0, :, 1]))
ax[2].set_ylabel("Degrees")

plt.show()

In [None]:
magfield_data_plot(data["si"][0], data["hi"][0], magnetic_field[0], data["m0"][0])

### TWOSTEP

In [None]:
from magyc import twostep_hsi as twostep

si_temp, hi_temp, count = np.zeros((3, 3)), np.zeros((1, 3)), 0

# Generate random values for magnetic field std and local magnetic field based on a seed for reproducibility
np.random.seed(22)
magnetic_field_std = np.abs(np.random.normal(loc=0.01, scale=0.01, size=magnetic_field.shape[0]))
local_magnetic_field_twostep = [local_magnetic_field * j for j in np.abs(np.random.normal(loc=1, scale=0.05, size=(magnetic_field.shape[0])))]

def process_iteration(i):
    try:
        hi_twostep, si_twostep, _ = twostep(magnetic_field[i], local_magnetic_field_twostep[i], measurement_noise_std=magnetic_field_std[i])
        if (not np.any(np.isnan(si_twostep))) and (not np.any(np.isnan(hi_twostep))):
            return hi_twostep, si_twostep
    except:
        return None

# Run calibration
cpus = cpu_count() if processors == 0 else processors
with Pool(cpus) as pool:
    results = list(tqdm(pool.imap(process_iteration, range(magnetic_field.shape[0])), total=magnetic_field.shape[0]))

# Process results
for i, result in enumerate(results):
    if result is not None and hsi_calibration_validation(result[1], result[0]):
        hi_temp += result[0]
        si_temp += result[1]
        count += 1

hi_twostep = hi_temp / count
si_twostep = si_temp / count

if count < magnetic_field.shape[0]:
    warn(f"{magnetic_field.shape[0] - count} iterations failed")

# Print results
print("Hard Iron: ", hi_twostep)
print("Soft Iron: ", si_twostep)

# Save results
if hsi_calibration_validation(si_twostep, hi_twostep):
    hi_dict["twostep"] = hi_twostep
    si_dict["twostep"] = si_twostep
    wb_dict["twostep"] = None
else:
    hi_dict["twostep"] = None
    si_dict["twostep"] = None
    wb_dict["twostep"] = None
    print("!!CALIBRATION NON VALID!!")

### Ellipsoid Fit

In [None]:
from magyc import ellipsoid_fit

si_temp, hi_temp, count = np.zeros((3, 3)), np.zeros((3, )), 0

def process_iteration(i):
    hi_ef, si_ef, _ = ellipsoid_fit(magnetic_field[i])
    return hi_ef, si_ef

# Run calibration
cpus = cpu_count() if processors == 0 else processors
with Pool(cpus) as pool:
    results = list(tqdm(pool.imap(process_iteration, range(magnetic_field.shape[0])), total=magnetic_field.shape[0]))

# Process results
for i, result in enumerate(results):
    if result[0] is not None and hsi_calibration_validation(result[1], result[0]):
        hi_temp += result[0]
        si_temp += result[1]
        count += 1

hi_ef = hi_temp / count
si_ef = si_temp / count

if count < magnetic_field.shape[0]:
    warn(f"{magnetic_field.shape[0] - count} iterations failed")

print("Hard Iron: ", hi_ef)
print("Soft Iron: ", si_ef)

# Save results
if hsi_calibration_validation(si_ef, hi_ef):
    hi_dict["ellipsoid_fit"] = hi_ef
    si_dict["ellipsoid_fit"] = si_ef
    wb_dict["ellipsoid_fit"] = None
else:
    hi_dict["ellipsoid_fit"] = None
    si_dict["ellipsoid_fit"] = None
    wb_dict["ellipsoid_fit"] = None
    print("!!CALIBRATION NON VALID!!")

### MagFactor3

In [None]:
from magyc import magfactor3

local_magnetic_field_magfactor3 = [local_magnetic_field * j for j in np.abs(np.random.normal(loc=1, scale=0.05, size=(magnetic_field.shape[0])))]
rph_magfactor3 = [rphj * j for rphj, j in zip(rph, np.abs(np.random.normal(loc=1, scale=0.05, size=(magnetic_field.shape[0]))))]
magnetic_declination_magfactor3 = [12.72 * j for j in np.abs(np.random.normal(loc=1, scale=0.05, size=(magnetic_field.shape[0])))]

si_temp, hi_temp, count = np.zeros((3, 3)), np.zeros((3, )), 0

def process_iteration(i):
    hi_mf3, si_mf3, _, _ = magfactor3(magnetic_field[i], rph_magfactor3[i], magnetic_declination_magfactor3[i], reference_magnetic_field=local_magnetic_field_magfactor3[i])
    return hi_mf3, si_mf3

# Run calibration
cpus = cpu_count() if processors == 0 else processors
with Pool(cpus) as pool:
    results = list(tqdm(pool.imap(process_iteration, range(magnetic_field.shape[0])), total=magnetic_field.shape[0]))

# Process results
for i, result in enumerate(results):
    if result[0] is not None and hsi_calibration_validation(result[1], result[0]):
        hi_temp += result[0]
        si_temp += result[1]
        count += 1

hi_mf3 = hi_temp / count
si_mf3 = si_temp / count

if count < magnetic_field.shape[0]:
    warn(f"{magnetic_field.shape[0] - count} iterations failed")

print("Hard Iron: ", hi_mf3)
print("Soft Iron: ", si_mf3)

# Save results
if hsi_calibration_validation(si_mf3, hi_mf3):
    hi_dict["magfactor3"] = hi_mf3
    si_dict["magfactor3"] = si_mf3
    wb_dict["magfactor3"] = None
else:
    hi_dict["magfactor3"] = None
    si_dict["magfactor3"] = None
    wb_dict["magfactor3"] = None
    print("!!CALIBRATION NON VALID!!")

### MAGYC-BFG

In [None]:
from magyc import magyc_bfg

si_temp, hi_temp, wb_temp, count = np.zeros((3, 3)), np.zeros((3, )), np.zeros((3, )), 0

def process_iteration(i):
    hi_bfg, si_bfg, wb_bfg, _, _, _ = magyc_bfg(magnetic_field[i], angular_rates[i], time[i], frequency)
    return hi_bfg, si_bfg, wb_bfg

# Run calibration
cpus = cpu_count() if processors == 0 else processors
with Pool(cpus) as pool:
    results = list(tqdm(pool.imap(process_iteration, range(magnetic_field.shape[0])), total=magnetic_field.shape[0]))

# Process results
for i, result in enumerate(results):
    if result[0] is not None and hsi_calibration_validation(result[1], result[0]):
        hi_temp += result[0]
        si_temp += result[1]
        wb_temp += result[2]
        count += 1

hi_bfg = hi_temp / count
si_bfg = si_temp / count
wb_bfg = wb_temp / count

if count < magnetic_field.shape[0]:
    warn(f"{magnetic_field.shape[0] - count} iterations failed")

print("Hard Iron: ", hi_bfg)
print("Soft Iron: ", si_bfg)
print("Gyro bias: ", wb_bfg)

# Save results
if hsi_calibration_validation(si_bfg, hi_bfg):
    hi_dict["magyc_bfg"] = hi_bfg
    si_dict["magyc_bfg"] = si_bfg
    wb_dict["magyc_bfg"] = wb_bfg
else:
    hi_dict["magyc_bfg"] = None
    si_dict["magyc_bfg"] = None
    wb_dict["magyc_bfg"] = None
    print("!!CALIBRATION NON VALID!!")

### MAGYC-IFG

In [None]:
from magyc import magyc_ifg

si_temp, hi_temp, wb_temp, count = np.zeros((3, 3)), np.zeros((3, )), np.zeros((3, )), 0

def process_iteration(i):
    hi_ifg, si_ifg, wb_ifg, _, _, _ = magyc_ifg(magnetic_field[i], angular_rates[i], time[i], frequency)

    return hi_ifg, si_ifg, wb_ifg

# Run calibration
cpus = cpu_count() if processors == 0 else processors
with Pool(cpus) as pool:
    results = list(tqdm(pool.imap(process_iteration, range(magnetic_field.shape[0])), total=magnetic_field.shape[0]))

# Process results
for i, result in enumerate(results):
    if result[0] is not None and hsi_calibration_validation(result[1], result[0]):
        hi_temp += result[0]
        si_temp += result[1]
        wb_temp += result[2]
        count += 1

hi_ifg = hi_temp / count
si_ifg = si_temp / count
wb_ifg = wb_temp / count

if count < magnetic_field.shape[0]:
    warn(f"{magnetic_field.shape[0] - count} iterations failed")

print("Hard Iron: ", hi_ifg)
print("Soft Iron: ", si_ifg)
print("Gyro bias: ", wb_ifg)

# Save results
if hsi_calibration_validation(si_ifg, hi_ifg):
    hi_dict["magyc_ifg"] = hi_ifg
    si_dict["magyc_ifg"] = si_ifg
    wb_dict["magyc_ifg"] = wb_ifg
else:
    hi_dict["magyc_ifg"] = None
    si_dict["magyc_ifg"] = None
    wb_dict["magyc_ifg"] = None
    print("!!CALIBRATION NON VALID!!")


## Calibration Results

In [None]:
# Save a csv with the results
data_hsi = dict()
data_hsi["Method"] = ["A00", "A01", "A02", "A10", "A11", "A12", "A20", "A21", "A22", "B0", "B1", "B2", "W0", "W1", "W2"]

for k in hi_dict.keys():
    if si_dict[k] is None or np.any(np.isnan(si_dict[k])):
        data_hsi[k] = np.zeros((9, )).tolist()
    else:
        data_hsi[k] = np.round(si_dict[k], 6).flatten().tolist()
    if hi_dict[k] is None or np.any(np.isnan(hi_dict[k])):
        data_hsi[k] += np.zeros((3, )).tolist()
    else:
        data_hsi[k] += np.round(hi_dict[k], 6).flatten().tolist()
    if wb_dict[k] is None or np.any(np.isnan(wb_dict[k])):
        data_hsi[k] += np.zeros((3, )).tolist()
    else:
        data_hsi[k] += np.round(wb_dict[k], 6).flatten().tolist()

df_hsi = pd.DataFrame(data_hsi)
print(df_hsi)

## Self-Evaluation

In [None]:
mf_raw_magnitude = nm.norm(magnetic_field[0])
mf_bfg_magnitude = nm.norm((np.linalg.inv(si_bfg) @ (magnetic_field[0] - hi_bfg).T).T)
mf_ifg_magnitude = nm.norm((np.linalg.inv(si_ifg) @ (magnetic_field[0] - hi_ifg).T).T)
mf_mf3_magnitude = nm.norm((np.linalg.inv(si_mf3) @ (magnetic_field[0] - hi_mf3).T).T)
mf_ef_magnitude = nm.norm((np.linalg.inv(si_ef) @ (magnetic_field[0] - hi_ef).T).T)
mf_twostep_magnitude = nm.norm((np.linalg.inv(si_twostep) @ (magnetic_field[0] - hi_twostep).T).T)
corrected_mf = [mf_raw_magnitude, mf_twostep_magnitude, mf_ef_magnitude, mf_mf3_magnitude, mf_bfg_magnitude, mf_ifg_magnitude]
methods_names = ["RAW", "TWOSTEP", "Ellipsoid Fit", "MagFactor3", "MAGYC-BFG", "MAGYC-IFG"]

# Print magnetic field metrics summary
data = dict()
data["Metrics"] = [
    "Mean (mG)",
    "Std (mG)",
    "Max (mG)",
    "Min (mG)",
    "RMSE (mG)",
]

for i, method in enumerate(corrected_mf):
    metrics = [
        np.mean(method),
        np.std(method),
        np.max(method),
        np.min(method),
        np.sqrt(np.mean((method - np.mean(method))**2)),
    ]

    metrics = [round(m * 1e3, 4) for m in metrics]
    data[methods_names[i]] = metrics

df = pd.DataFrame(data)
pd.set_option('display.max_columns', None)
pd.set_option('display.expand_frame_repr', False)
print(df)

## Save Results

In [None]:
# Write results
dict_pairs = [(hi_dict, output_path_hi), (si_dict, output_path_si), (wb_dict, output_path_wb)]

for d, p in dict_pairs:
    with open(p, 'wb') as f:
        pkl.dump(d, f)