<a href="https://colab.research.google.com/github/123shwetarohokale/563-ShwetaR/blob/main/Mckaelin_P2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import socket
from collections import deque
from scipy.spatial.transform import Rotation as R

# Quaternion integration using matrix exponential approach
def integrate_quat(q, w, dt):
    norm_w = np.linalg.norm(w)
    if norm_w == 0:
        return q

    theta = 0.5 * norm_w * dt
    Ohm = omega_matrix(w)
    Om_exp = np.cos(theta) * np.eye(4) + np.sin(theta) * Ohm / norm_w
    q_next = Om_exp @ q
    return q_next / np.linalg.norm(q_next)

def omega_matrix(w):
    wx, wy, wz = w
    return np.array([
        [0, -wx, -wy, -wz],
        [wx, 0, wz, -wy],
        [wy, -wz, 0, wx],
        [wz, wy, -wx, 0]
    ])

def skew(v):
    x, y, z = v
    return np.array([
        [0, -z, y],
        [z, 0, -x],
        [-y, x, 0]
    ])

# Part 1: Quaternion Integration ---------------------------------------------------------------------------------------------------------------------------------------
q_hat = np.array([1, 0, 0, 0], dtype=float)
dt = 0.01
steps_per_segment = int(0.25 / dt)

w_seq = np.array([
    [np.pi, 0, 0],
    [0, 0, np.pi],
    [-np.pi, 0, 0],
    [0, 0, -np.pi]
])

for seg in range(4):
    w = w_seq[seg]
    for _ in range(steps_per_segment):
        q_hat = integrate_quat(q_hat, w, dt)

print("Final quaternion q(1):\n")
print(q_hat)
print("Norm of difference from identity quaternion:")
print(np.linalg.norm(q_hat - np.array([1, 0, 0, 0])))

Final quaternion q(1):

[ 0.95710678  0.10355339 -0.25       -0.10355339]
Norm of difference from identity quaternion:
0.2928932188134523


In [3]:
###############################################################################################
# Part 2: Mahony Filter Implementation -----------------------------------------------------------------------------------------------------------------------
def custom_mahony(input_data):
    t = input_data[:, 0]
    mag = input_data[:, 1:4]
    gyro = input_data[:, 4:7]
    accel = input_data[:, 7:10]

    dt = 0.01
    Ka, Kp, Km, Ki = 1, 1, 1, 0.3

    q_hat = np.array([1, 0, 0, 0], dtype=float)
    b_hat = np.zeros(3)
    Va_hat = np.array([0, 0, 1])
    #m0 = np.array([4463.4, 19704.2, 0])
    #Vm_hat = mo / np.linalg.norm(m0)
    Vm_hat = np.array([1, 0, 0])
    g_inertial = np.array([0, 0, 1])

    N = input_data.shape[0]
    theta_step_all = np.zeros(N)
    theta = np.zeros(N)

    for k in range(N - 1):
        omega_y = gyro[k]
        a = accel[k]
        m = mag[k]

        Va = a / np.linalg.norm(a)
        rot = R.from_quat(q_hat[[1, 2, 3, 0]]).as_matrix()  # w,x,y,z to x,y,z,w
        g_body = rot @ g_inertial

        Vm = m - np.dot(m, g_body) * g_body
        Vm = Vm / np.linalg.norm(Vm)

        omega_mes = Ka * np.cross(Va, Va_hat) + Km * np.cross(Vm, Vm_hat)
        u = omega_y - b_hat + Kp * omega_mes

        q_hat = integrate_quat(q_hat, u, dt)

        u_norm = np.linalg.norm(u)
        if u_norm > 1e-6:
            ux = skew(u / u_norm)
            Rexp = np.eye(3) + np.sin(-u_norm * dt) * ux + (1 - np.cos(-u_norm * dt)) * (ux @ ux)
        else:
            Rexp = np.eye(3)

        Va_hat = Rexp @ Va_hat
        Vm_hat = Rexp @ Vm_hat

        b_hat = b_hat - Ki * omega_mes * dt

        theta_step = 2 * np.arccos(min(1, abs(q_hat[0])))
        theta_step_all[k + 1] = theta_step
        theta[k + 1] = theta[k] + theta_step

    return t, theta_step_all, theta

# Load and run Mahony filter
input_data = np.loadtxt("P2_input", delimiter=',')
t, theta_step_all, theta = custom_mahony(input_data)

print('\n\n')
print(f'Total rotation: {theta[-1]:.3f} radians\n')
plt.plot(t, theta_step_all, linewidth=1.5)
plt.xlabel('Time [s]')
plt.ylabel(r'$\theta_{step}(t)$ [rad]')
plt.title('Instantaneous Rotation Angle per Step')
plt.grid(True)
plt.show()

print(f'Total rotation: {theta[-1]:.3f} radians')

FileNotFoundError: P2_input not found.

In [4]:

######################################################################################################
# # Problem 3: Live IMU Reading (Graph made after rotating) --------------------------------------
import socket
import csv
import time
import numpy as np
import matplotlib.pyplot as plt
from scipy.spatial.transform import Rotation as R

# Functions
def skew(w):
    return np.array([
        [0, -w[2], w[1]],
        [w[2], 0, -w[0]],
        [-w[1], w[0], 0]
    ])

def integrate_quat(q, omega, dt):
    omega_norm = np.linalg.norm(omega)
    if omega_norm > 1e-6:
        theta = omega_norm * dt
        axis = omega / omega_norm
        dq = R.from_rotvec(axis * theta).as_quat()  # xyzw
    else:
        dq = np.array([0, 0, 0, 1])  # identity quaternion

    r1 = R.from_quat(q[[1, 2, 3, 0]])  # wxyz → xyzw
    r2 = R.from_quat(dq)
    q_new = r1 * r2
    return np.roll(q_new.as_quat(), 1)  # back to wxyz

# Socket setup
HOST = '192.168.0.149'
PORT = 1996

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((HOST, PORT))
server_socket.listen(1)

print(f"Waiting for connection on port {PORT}...")
client_socket, client_address = server_socket.accept()
print(f"Connected to {client_address}")

buffer = ""
headers = []

# Logging
t_log, rpy_log, mahony_error_log, b_hat_log = [], [], [], []
start_time = None

# Initialization
q_hat = np.array([1, 0, 0, 0], dtype=float)
b_hat = np.zeros(3)
Va_hat = np.array([0, 0, 1])
Vm_hat = np.array([1, 0, 0])
g_inertial = np.array([0, 0, 1])
Ka, Kp, Km, Ki = 1, 1, 1, 0.3
dt = 0.01

try:
    while True:
        data = client_socket.recv(4096).decode("utf-8")
        buffer += data
        while '\n' in buffer:
            line, buffer = buffer.split('\n', 1)
            if not headers:
                headers = next(csv.reader([line], delimiter=','))
                print("CSV Headers:", headers)
                continue

            row = next(csv.reader([line], delimiter=','))
            data_dict = dict(zip(headers, row))

            acc = np.array([float(data_dict.get("motionUserAccelerationX(G)", 0)),
                            float(data_dict.get("motionUserAccelerationY(G)", 0)),
                            float(data_dict.get("motionUserAccelerationZ(G)", 0))])
            gyro = np.array([float(data_dict.get("motionRotationRateX(rad/s)", 0)),
                             float(data_dict.get("motionRotationRateY(rad/s)", 0)),
                             float(data_dict.get("motionRotationRateZ(rad/s)", 0))])
            mag = np.array([float(data_dict.get("magnetometerX(µT)", 0)),
                            float(data_dict.get("magnetometerY(µT)", 0)),
                            float(data_dict.get("magnetometerZ(µT)", 0))])

            if start_time is None:
                start_time = time.time()
            t_now = time.time() - start_time
            t_log.append(t_now)

            Va = acc / np.linalg.norm(acc)
            rot = R.from_quat(q_hat[[1, 2, 3, 0]]).as_matrix()
            g_body = rot @ g_inertial

            Vm = mag - np.dot(mag, g_body) * g_body
            Vm = Vm / np.linalg.norm(Vm)

            omega_mes = Ka * np.cross(Va, Va_hat) + Km * np.cross(Vm, Vm_hat)
            u = gyro - b_hat + Kp * omega_mes

            q_hat = integrate_quat(q_hat, u, dt)

            u_norm = np.linalg.norm(u)
            if u_norm > 1e-6:
                ux = skew(u / u_norm)
                Rexp = np.eye(3) + np.sin(-u_norm * dt) * ux + (1 - np.cos(-u_norm * dt)) * (ux @ ux)
            else:
                Rexp = np.eye(3)

            Va_hat = Rexp @ Va_hat
            Vm_hat = Rexp @ Vm_hat
            b_hat = b_hat - Ki * omega_mes * dt

            rpy = R.from_quat(q_hat[[1, 2, 3, 0]]).as_euler('xyz')
            rpy[0] *= -1  # Invert roll for your system
            rpy_log.append(rpy)

            mahony_error = (1 - np.dot(Va, Va_hat)) + (1 - np.dot(Vm, Vm_hat))
            mahony_error_log.append(mahony_error)
            b_hat_log.append(b_hat.copy())

            print(f"RPY = {rpy}")

except KeyboardInterrupt:
    print("Shutting down due to keyboard interrupt...")

except Exception as e:
    print(f"Error occurred: {e}")

finally:
    client_socket.close()
    server_socket.close()

    if rpy_log:
        rpy_log = np.array(rpy_log)
        mahony_error_log = np.array(mahony_error_log)
        b_hat_log = np.array(b_hat_log)

        # Plot RPY
        plt.figure(figsize=(10, 6))
        plt.plot(t_log, rpy_log[:, 0], label='Roll [rad]', color='green')
        plt.plot(t_log, rpy_log[:, 1], label='Pitch [rad]', color='red')
        plt.plot(t_log, rpy_log[:, 2], label='Yaw [rad]', color='blue')
        plt.xlabel('Time [s]')
        plt.ylabel('Angle [rad]')
        plt.title('Orientation Estimate (RPY) Over Time')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()

        # Plot Mahony error (your custom equation)
        plt.figure(figsize=(10, 4))
        plt.plot(t_log, mahony_error_log, label='Error_est', color='red')
        plt.xlabel('Time [s]')
        plt.ylabel('Custom Mahony Error')
        plt.title('Mahony Filter Error Over Time')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()

        # Plot Gyro Bias
        plt.figure(figsize=(10, 4))
        plt.plot(t_log, b_hat_log[:, 0], label='Bias x', color='red')
        plt.plot(t_log, b_hat_log[:, 1], label='Bias y', color='green')
        plt.plot(t_log, b_hat_log[:, 2], label='Bias z', color='blue')
        plt.xlabel('Time [s]')
        plt.ylabel('Gyro Bias Estimate [rad/s]')
        plt.title('Gyroscope Bias Estimate (b_hat) Over Time')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()
    else:
        print("No RPY data to plot.")

OSError: [Errno 99] Cannot assign requested address

In [5]:
##################################################################################################
# Problem 4 -----------------------------------------------------------------
import socket
import csv
import time
import numpy as np
import matplotlib.pyplot as plt

# Quaternion integration helpers
def omega_matrix(w):
    wx, wy, wz = w
    return np.array([
        [0, -wx, -wy, -wz],
        [wx,  0,  wz, -wy],
        [wy, -wz,  0,  wx],
        [wz,  wy, -wx,  0]
    ])

def integrate_quat(q, w, dt):
    norm_w = np.linalg.norm(w)
    if norm_w == 0:
        return q
    theta = 0.5 * norm_w * dt
    Ohm = omega_matrix(w)
    Om_exp = np.cos(theta) * np.eye(4) + np.sin(theta) * Ohm / norm_w
    q_next = Om_exp @ q
    return q_next / np.linalg.norm(q_next)

# TCP settings
HOST = '192.168.0.149'
PORT = 1996

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((HOST, PORT))
server_socket.listen(1)

print(f"Waiting for connection on port {PORT}...")
client_socket, client_address = server_socket.accept()
print(f"Connected to {client_address}")

buffer = ""
headers = []
start_time = None
dt = 0.01  # seconds

# Initial orientation estimate: identity quaternion [w, x, y, z]
q = np.array([1.0, 0.0, 0.0, 0.0])

# Logs
t_log = []
q_log = []

try:
    while True:
        data = client_socket.recv(4096).decode("utf-8")
        buffer += data
        while '\n' in buffer:
            line, buffer = buffer.split('\n', 1)
            if not headers:
                headers = next(csv.reader([line], delimiter=','))
                print("CSV Headers:", headers)
                continue

            row = next(csv.reader([line], delimiter=','))
            data_dict = dict(zip(headers, row))

            # Read gyroscope data (rad/s)
            gyro = np.array([
                float(data_dict.get("motionRotationRateX(rad/s)", 0)),
                float(data_dict.get("motionRotationRateY(rad/s)", 0)),
                float(data_dict.get("motionRotationRateZ(rad/s)", 0))
            ])

            # Timestamp for plotting
            if start_time is None:
                start_time = time.time()
            elapsed_time = time.time() - start_time
            t_log.append(elapsed_time)

            # Integrate gyro to update quaternion
            q = integrate_quat(q, gyro, dt)
            q_log.append(q.copy())

            # Print quaternion [w, x, y, z]
            # print(f"Quaternion = {q}")

except KeyboardInterrupt:
    print("Shutting down...")

except Exception as e:
    print(f"Error: {e}")

finally:
    client_socket.close()
    server_socket.close()

    if q_log:
        q_log = np.array(q_log)
        t_log = np.array(t_log)

        # Plot quaternion components
        plt.figure(figsize=(10, 6))
        plt.plot(t_log, q_log[:, 0], label='w', color='black')
        plt.plot(t_log, q_log[:, 1], label='x', color='red')
        plt.plot(t_log, q_log[:, 2], label='y', color='green')
        plt.plot(t_log, q_log[:, 3], label='z', color='blue')
        plt.xlabel('Time [s]')
        plt.ylabel('Quaternion Component')
        plt.title('Quaternion Estimate Over Time')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()
    else:
        print("No quaternion data to plot.")


OSError: [Errno 99] Cannot assign requested address