In [None]:
import numpy as np
from scipy.linalg import expm
import pygame
import matplotlib.pyplot as plt
import quaternion

In [None]:
""" Cube Visualisation: Setup and function definitions """

pygame.init()

WIDTH, HEIGHT = 1280, 720

WHITE = (255, 255, 255)
L_RED = (255, 200, 150)
L_GREEN = (150, 255, 150)
L_BLUE = (150, 200, 255)
RED   = (255, 0, 0)
GREEN = (0, 255, 0)
CYAN  = (0, 100, 255)
BLACK = (40, 40, 40)

font = pygame.font.Font('freesansbold.ttf', 32)
label_true = font.render('True Attitude', True, L_RED, BLACK)
label_hat = font.render('MEKF Estimate', True, L_GREEN, BLACK)
labelRect_true = label_true.get_rect()
labelRect_hat = label_hat.get_rect()
labelRect_true.center = (.25*WIDTH, .1*HEIGHT)
labelRect_hat.center = (.75*WIDTH, .1*HEIGHT)

c_cube_true = [.25*WIDTH, 3*HEIGHT/5]  # x, y
c_cube_hat = [.75*WIDTH, 3*HEIGHT/5]  # x, y

points = []

# all the cube vertices
points.append(np.matrix([-1, -1, .4]))
points.append(np.matrix([1, -1, .4]))
points.append(np.matrix([1,  1, .4]))
points.append(np.matrix([-1, 1, .4]))
points.append(np.matrix([-1, -1, -.4]))
points.append(np.matrix([1, -1, -.4]))
points.append(np.matrix([1, 1, -.4]))
points.append(np.matrix([-1, 1, -.4]))

proj_points_true = [[n, n] for n in range(len(points))]
proj_points_hat = [[n, n] for n in range(len(points))]

In [None]:
def connect_points(screen, i, j, points, color):
    pygame.draw.line(screen, color, (points[i][0], points[i][1]), (points[j][0], points[j][1]), 3)

def get_proj_xy(DCM, point, center, scale, distance):
    rotated2d  = np.dot(DCM, point.reshape((3, 1)))
    x = float(rotated2d[0][0].item())
    y = float(rotated2d[1][0].item())
    z = float(rotated2d[2][0].item())
    
    rotated2d[0][0] = y
    rotated2d[1][0] = -z
    rotated2d[2][0] = -x
    
    z = 1/(distance - float(rotated2d[2][0].item()))
    projection_matrix = np.matrix([[z, 0, 0], [0, z, 0]])
    projected2d = np.dot(projection_matrix, rotated2d)

    x = int(projected2d[0][0].item() * scale * distance) + center[0]
    y = int(projected2d[1][0].item() * scale * distance) + center[1]

    return [x, y]

def skew_symmetric(v):
    """Create skew symmetric matrix from vector"""
    return np.array([[0, -v[2], v[1]],
                     [v[2], 0, -v[0]],
                     [-v[1], v[0], 0]])

def rodrigues_formula(phi):
    """Rodrigues formula for rotation matrix from rotation vector"""
    angle = np.linalg.norm(phi)
    if angle < 1e-8:
        return np.eye(3) + skew_symmetric(phi)
    else:
        axis = phi / angle
        K = skew_symmetric(axis)
        return np.eye(3) + np.sin(angle) * K + (1 - np.cos(angle)) * np.dot(K, K)

class MEKF:
    def __init__(self, R0, Q, Ra, Rm=None):
        """
        Initialize MEKF
        R0: Initial attitude matrix
        Q: Process noise covariance (3x3)
        Ra: Accelerometer measurement noise covariance (3x3)  
        Rm: Magnetometer measurement noise covariance (3x3)
        """
        self.R_hat = R0.copy()
        self.P = np.eye(3) * 0.1  # Initial error covariance
        self.Q = Q  # Process noise covariance
        self.Ra = Ra  # Accelerometer noise covariance
        self.Rm = Rm  # Magnetometer noise covariance
        
        # Reference vectors
        self.a_ref = np.array([0, 0, 1])  # Gravity in NED
        self.m_ref = np.array([0.877, 0, 0.4796])  # Magnetic field in NED
        
    def predict(self, omega, dt):
        """Prediction step"""
        # Propagate attitude
        Omega_skew = skew_symmetric(omega)
        self.R_hat = np.dot(self.R_hat, expm(Omega_skew * dt))
        
        # Propagate error covariance
        # F = I - [omega]_x * dt (first order approximation)
        F = np.eye(3) - skew_symmetric(omega) * dt
        self.P = np.dot(np.dot(F, self.P), F.T) + self.Q * dt
        
    def update_accel(self, accel_meas):
        """Update with accelerometer measurement"""
        # Expected measurement in body frame
        a_pred = np.dot(self.R_hat.T, self.a_ref)
        
        # Measurement residual
        residual = accel_meas - a_pred
        
        # Measurement matrix (H)
        H = skew_symmetric(a_pred)
        
        # Innovation covariance
        S = np.dot(np.dot(H, self.P), H.T) + self.Ra
        
        # Kalman gain
        K = np.dot(np.dot(self.P, H.T), np.linalg.inv(S))
        
        # Error state update
        delta_theta = np.dot(K, residual)
        
        # Attitude correction using multiplicative update
        delta_R = rodrigues_formula(delta_theta)
        self.R_hat = np.dot(self.R_hat, delta_R)
        
        # Covariance update
        I_KH = np.eye(3) - np.dot(K, H)
        self.P = np.dot(I_KH, self.P)
        
    def update_mag(self, mag_meas):
        """Update with magnetometer measurement"""
        if self.Rm is None:
            return
            
        # Expected measurement in body frame
        m_pred = np.dot(self.R_hat.T, self.m_ref)
        
        # Measurement residual
        residual = mag_meas - m_pred
        
        # Measurement matrix (H)
        H = skew_symmetric(m_pred)
        
        # Innovation covariance
        S = np.dot(np.dot(H, self.P), H.T) + self.Rm
        
        # Kalman gain
        K = np.dot(np.dot(self.P, H.T), np.linalg.inv(S))
        
        # Error state update
        delta_theta = np.dot(K, residual)
        
        # Attitude correction using multiplicative update
        delta_R = rodrigues_formula(delta_theta)
        self.R_hat = np.dot(self.R_hat, delta_R)
        
        # Covariance update
        I_KH = np.eye(3) - np.dot(K, H)
        self.P = np.dot(I_KH, self.P)

pygame 2.6.1 (SDL 2.28.4, Python 3.13.6)
Hello from the pygame community. https://www.pygame.org/contribute.html


error: video system not initialized

: 

In [None]:
""" Simulation Setup """

### Simulation duration and time step
duration = 10.0         # seconds
dt = 0.01               # time step in seconds 100FPS=.01sec/frame
N = int(duration / dt)  # 1000 steps

### Motion Setup - More interesting motion
R = np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]], dtype=float)
omega_base = np.array([0.5, 0.3, 0.2])  # Base angular velocity

### Sensor Setup
a_ref = np.array([0, 0, 1])         # NED gravity
m_ref = np.array([0.877, 0, 0.4796])  # NED magnetic field (Mumbai)

##### Sensor characteristics - Realistic IMU noise
sigma_g = 0.1  # Gyro noise std (rad/s)
sigma_a = 0.5  # Accelerometer noise std (m/s^2)  
sigma_m = 0.1  # Magnetometer noise std

mu_g = np.array([0, 0, 0])
cov_g = np.eye(3) * sigma_g**2
mu_a = np.array([0, 0, 0])
cov_a = np.eye(3) * sigma_a**2
mu_m = np.array([0, 0, 0])
cov_m = np.eye(3) * sigma_m**2

### MEKF Setup
R0_hat = np.array([[0, -1, 0], [1, 0, 0], [0, 0, 1]], dtype=float)  # Initial estimate (with error)

# Process and measurement noise covariances
Q = np.eye(3) * 0.01  # Process noise
Ra = cov_a  # Accelerometer noise
Rm = cov_m  # Magnetometer noise

# Create MEKF instances
mekf_accel_only = MEKF(R0_hat.copy(), Q, Ra)  # Only accelerometer
mekf_full = MEKF(R0_hat.copy(), Q, Ra, Rm)    # Accelerometer + magnetometer

# Data collection arrays
time_array = np.linspace(0, duration, N)
err_accel_only = []
err_full = []
gyro_bias = []
P_trace = []

# True states for comparison
R_true_history = []
R_hat_accel_history = []
R_hat_full_history = []

""" Run: Simulation, Estimation, Cube-Visualisation and Collect Data """

pygame.display.set_caption("MEKF Attitude Estimation")
screen = pygame.display.set_mode((WIDTH, HEIGHT))
clock = pygame.time.Clock()

for n in range(N):
    
    clock.tick(100)
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            pygame.quit()
            exit()
        if event.type == pygame.KEYDOWN:
            if event.key == pygame.K_ESCAPE:
                pygame.quit()
                exit()

    # Time-varying angular velocity for more interesting motion
    t = n * dt
    omega = omega_base * (1 + 0.5 * np.sin(2 * np.pi * t / 3))
    Omega_skew = skew_symmetric(omega)
    
    # Simulate true motion
    R = np.dot(R, expm(Omega_skew * dt))

    # Generate sensor measurements with noise
    gyro_meas = omega + np.random.multivariate_normal(mu_g, cov_g)
    acc_meas = np.dot(R.T, a_ref) + np.random.multivariate_normal(mu_a, cov_a)
    mag_meas = np.dot(R.T, m_ref) + np.random.multivariate_normal(mu_m, cov_m)

    # MEKF Prediction step
    mekf_accel_only.predict(gyro_meas, dt)
    mekf_full.predict(gyro_meas, dt)
    
    # MEKF Update step
    mekf_accel_only.update_accel(acc_meas)
    
    mekf_full.update_accel(acc_meas)
    mekf_full.update_mag(mag_meas)

    # Record data for analysis
    err_accel_only.append(3 - np.trace(np.dot(R, mekf_accel_only.R_hat.T)))
    err_full.append(3 - np.trace(np.dot(R, mekf_full.R_hat.T)))
    P_trace.append(np.trace(mekf_full.P))
    
    # Store histories
    R_true_history.append(R.copy())
    R_hat_accel_history.append(mekf_accel_only.R_hat.copy())
    R_hat_full_history.append(mekf_full.R_hat.copy())

    # Visualize every 10th frame for better performance
    if n % 10 == 0:
        screen.fill(BLACK)
        
        for i, point in enumerate(points):
            proj_points_true[i] = get_proj_xy(R, point, c_cube_true, 150, 50)
            proj_points_hat[i] = get_proj_xy(mekf_full.R_hat, point, c_cube_hat, 150, 50)
            
            if float(point.reshape((3, 1))[0].item()) < 0:
                pygame.draw.circle(screen, RED, (proj_points_true[i][0], proj_points_true[i][1]), 5)
                pygame.draw.circle(screen, RED, (proj_points_hat[i][0], proj_points_hat[i][1]), 5)
            else:
                pygame.draw.circle(screen, GREEN, (proj_points_true[i][0], proj_points_true[i][1]), 5)
                pygame.draw.circle(screen, GREEN, (proj_points_hat[i][0], proj_points_hat[i][1]), 5)

        # Draw cube edges
        for p in range(4):
            connect_points(screen, p, (p+1) % 4, proj_points_true, L_RED)
            connect_points(screen, p+4, ((p+1) % 4) + 4, proj_points_true, L_RED)
            connect_points(screen, p, (p+4), proj_points_true, L_RED)
            
            connect_points(screen, p, (p+1) % 4, proj_points_hat, L_GREEN)
            connect_points(screen, p+4, ((p+1) % 4) + 4, proj_points_hat, L_GREEN)
            connect_points(screen, p, (p+4), proj_points_hat, L_GREEN)
        
        # Labels
        screen.blit(label_true, labelRect_true)
        screen.blit(label_hat, labelRect_hat)
        
        pygame.display.update()

pygame.quit()

In [None]:
""" Plot the Results """

# Convert to numpy arrays for easier manipulation
err_accel_only = np.array(err_accel_only)
err_full = np.array(err_full)
P_trace = np.array(P_trace)

# Create multiple plots
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Plot 1: Rotation error comparison
axes[0, 0].plot(time_array, err_accel_only, 'b-', label='Accel Only', linewidth=2)
axes[0, 0].plot(time_array, err_full, 'r-', label='Accel + Mag', linewidth=2)
axes[0, 0].set_xlabel('Time (s)')
axes[0, 0].set_ylabel('Rotation Error')
axes[0, 0].set_title('MEKF Attitude Error Comparison')
axes[0, 0].legend()
axes[0, 0].grid(True)

# Plot 2: Error covariance trace
axes[0, 1].plot(time_array, P_trace, 'g-', linewidth=2)
axes[0, 1].set_xlabel('Time (s)')
axes[0, 1].set_ylabel('Trace(P)')
axes[0, 1].set_title('Error Covariance Trace')
axes[0, 1].grid(True)

# Plot 3: Euler angles comparison (using final estimates)
def rotation_to_euler(R):
    """Convert rotation matrix to Euler angles (roll, pitch, yaw)"""
    roll = np.arctan2(R[2, 1], R[2, 2])
    pitch = np.arctan2(-R[2, 0], np.sqrt(R[2, 1]**2 + R[2, 2]**2))
    yaw = np.arctan2(R[1, 0], R[0, 0])
    return np.array([roll, pitch, yaw]) * 180 / np.pi

# Compute Euler angles for the last 100 samples
euler_true = []
euler_est = []
for i in range(max(0, N-100), N):
    euler_true.append(rotation_to_euler(R_true_history[i]))
    euler_est.append(rotation_to_euler(R_hat_full_history[i]))

euler_true = np.array(euler_true)
euler_est = np.array(euler_est)
time_euler = time_array[max(0, N-100):N]

axes[1, 0].plot(time_euler, euler_true[:, 0], 'b-', label='True Roll', linewidth=2)
axes[1, 0].plot(time_euler, euler_est[:, 0], 'r--', label='Est Roll', linewidth=2)
axes[1, 0].set_xlabel('Time (s)')
axes[1, 0].set_ylabel('Roll (deg)')
axes[1, 0].set_title('Roll Angle Comparison (Last 1s)')
axes[1, 0].legend()
axes[1, 0].grid(True)

# Plot 4: Error magnitude over time
error_mag_accel = np.sqrt(err_accel_only)
error_mag_full = np.sqrt(err_full)

axes[1, 1].semilogy(time_array, error_mag_accel, 'b-', label='Accel Only', linewidth=2)
axes[1, 1].semilogy(time_array, error_mag_full, 'r-', label='Accel + Mag', linewidth=2)
axes[1, 1].set_xlabel('Time (s)')
axes[1, 1].set_ylabel('Error Magnitude (log scale)')
axes[1, 1].set_title('Attitude Error Magnitude')
axes[1, 1].legend()
axes[1, 1].grid(True)

plt.tight_layout()
plt.show()

print("MEKF Implementation Results:")
print(f"Final error (Accel only): {err_accel_only[-1]:.4f}")
print(f"Final error (Accel + Mag): {err_full[-1]:.4f}")
print(f"Mean error (Accel only): {np.mean(err_accel_only):.4f}")
print(f"Mean error (Accel + Mag): {np.mean(err_full):.4f}")
print(f"Error reduction with magnetometer: {(1 - np.mean(err_full)/np.mean(err_accel_only))*100:.1f}%")