In [None]:
%matplotlib qt
%load_ext autoreload
%autoreload 2
from local_plot import *
from utils import *
#Plot the IMU data
from unicodedata import name
from scipy.fft import fft, fftfreq
import matplotlib
matplotlib.rcParams.update({'font.size': 30})


In [None]:
import rosbag
def read_imu(bagpath, start=0.0):
    ts = []
    acc = []
    gyro = []
    t0 = None
    for topic, msg, t in rosbag.Bag(bagpath).read_messages():
        if msg._type == "sensor_msgs/Imu":
            if t0 is None:
                t0 = msg.header.stamp.to_sec()
            if msg.header.stamp.to_sec() - t0 < start:
                continue
            ts.append(msg.header.stamp.to_sec() - t0)
            acc.append([msg.linear_acceleration.x, msg.linear_acceleration.y, msg.linear_acceleration.z])
            gyro.append([msg.angular_velocity.x, msg.angular_velocity.y, msg.angular_velocity.z])
    imu = {}
    imu["t"], imu["acc"], imu["gyro"] = np.array(ts), np.array(acc), np.array(gyro)
    return imu

# imu = read_imu("/home/xuhao/data/d2slam/manual_vicon_quad_2022_10_25_small/manual_small_1-sync.bag", 20)
# bagname = "/home/xuhao/data/d2slam/manual_vicon_quad_2022_10_25_small/manual_data_calib-sync.bag"
bagname = "/home/xuhao/data/d2slam/manual_vicon_quad_2022_10_25_small/manual_small_6-sync.bag"
imu = read_imu(bagname)



In [None]:
def fft_data(data, fs):
    N = len(data)
    T = 1.0 / fs
    yf = fft(data)
    xf = fftfreq(N, T)[:N//2]
    return xf, 2.0/N * np.abs(yf[0:N//2])
def draw_fft_data(data, fs, label, ax, axis=""):
    xf, yf = fft_data(data, fs)
    ax.semilogy(xf, yf, label=label)
    ax.set_ylabel(f"Amp. {axis}")
    # ax.set_ylim(0, 1.2*np.max(yf))
    ax.legend()
    ax.grid()
def draw_fft(imu, name, item="acc", axs=None, figsize=(20,10)):
    if axs is None:
        plt.figure(f"{name}_fft_{item}", figsize=figsize)
        plt.clf()
        fig, axs = plt.subplots(3,1, sharex=True, figsize=figsize, num=f"{name}_fft_{item}")
    draw_fft_data(imu[item][:,0], 220.0, name, axs[0], "x")
    draw_fft_data(imu[item][:,1], 220.0, name, axs[1], "y")
    draw_fft_data(imu[item][:,2], 220.0, name, axs[2], "z")
    axs[2].set_xlabel("Frequency (Hz)")
    plt.tight_layout()
    return axs
    
def draw_imu_data(imu, suffix="", axs=None, axs_gyro=None, figsize = (20,10)):
    if axs is None:
        plt.figure(f"acc", figsize=figsize)
        plt.clf()
        fig, axs = plt.subplots(3,1, sharex=True, figsize=figsize, num=f"acc")

        plt.figure(f"gyro", figsize=figsize)
        plt.clf()
        fig, axs_gyro = plt.subplots(3,1, sharex=True, figsize=figsize, num=f"gyro")

    axs[0].plot(imu["t"], imu[f"acc{suffix}"][:,0] - np.mean(imu[f"acc{suffix}"][:,0]), label=f"a_x {suffix}")
    plt.legend()
    axs[1].plot(imu["t"], imu[f"acc{suffix}"][:,1] - np.mean(imu[f"acc{suffix}"][:,1]), label=f"a_y {suffix}")
    plt.legend()
    axs[2].plot(imu["t"], imu[f"acc{suffix}"][:,2], label=f"a_z {suffix}")
    plt.legend()
    axs_gyro[0].plot(imu["t"], imu[f"gyro{suffix}"][:,0], label="w_x")
    plt.legend()
    axs_gyro[1].plot(imu["t"], imu[f"gyro{suffix}"][:,1], label="w_y")
    plt.legend()
    axs_gyro[2].plot(imu["t"], imu[f"gyro{suffix}"][:,2], label="w_z")
    return axs, axs_gyro


In [None]:
#Appy 2nd order butterworth filter
from matplotlib.lines import lineStyles
from scipy.signal import butter, lfilter, iirnotch, filtfilt
def butter_lowpass_filter(data, cutoff, fs, order=5):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    y = lfilter(b, a, data)
    return y
#Apply a notch filter
def notch_filter(data, cut_freq, fs, Q = 30.0):
    b, a = iirnotch(cut_freq, Q, fs)
    y = filtfilt(b, a, data)
    return y
#Apply 2nd order butterworth filter on the data
acc = imu["acc"]
gyro = imu["gyro"]
fs = 220.0
cutoff = 20.0
order = 2
acc_filtered = np.zeros(acc.shape)
gyro_filtered = np.zeros(gyro.shape)
for i in range(3):
    acc_filtered[:,i] = butter_lowpass_filter(acc[:,i], cutoff, fs, order)
    gyro_filtered[:,i] = butter_lowpass_filter(gyro[:,i], cutoff, fs, order)
imu["acc_lowpass"] = acc_filtered
imu["gyro_lowpass"] = gyro_filtered
#Plot the filtered data with raw
cut_freq = 65
Q = 3
acc_filtered = np.zeros(acc.shape)
gyro_filtered = np.zeros(gyro.shape)
for i in range(3):
    acc_filtered[:,i] = notch_filter(acc[:,i], cut_freq, fs, Q)
    gyro_filtered[:,i] = notch_filter(gyro[:,i], cut_freq, fs, Q)
imu["acc_notch"] = acc_filtered
imu["gyro_notch"] = gyro_filtered

acc_filtered = np.zeros(acc.shape)
gyro_filtered = np.zeros(gyro.shape)
for i in range(3):
    acc_filtered[:,i] = butter_lowpass_filter(imu["acc_notch"][:,i], cutoff, fs, order)
    gyro_filtered[:,i] = butter_lowpass_filter(imu["gyro_notch"][:,i], cutoff, fs, order)
imu["acc_notch_lowpass"] = acc_filtered
imu["gyro_notch_lowpass"] = gyro_filtered

figsize = (20,15)
axs, axs_gyro = draw_imu_data(imu, suffix="", figsize=figsize)
# axs = draw_imu_data(imu, suffix="_butter", axs=axs)
axs, axs_gyro = draw_imu_data(imu, suffix="_notch", axs=axs, axs_gyro=axs_gyro)
axs, axs_gyro = draw_imu_data(imu, suffix="_lowpass", axs=axs, axs_gyro=axs_gyro)
axs, axs_gyro = draw_imu_data(imu, suffix="_notch_lowpass", axs=axs, axs_gyro=axs_gyro)

axs = draw_fft(imu, "raw", "acc", figsize=figsize)
axs = draw_fft(imu, "notch", item="acc_notch", axs=axs)
axs = draw_fft(imu, "notch_lp", item="acc_notch_lowpass", axs=axs)

axs_gyro = draw_fft(imu, "raw", "gyro", figsize=figsize)
axs_gyro = draw_fft(imu, "notch", item="gyro_notch", axs=axs_gyro)
axs_gyro = draw_fft(imu, "lowpass", item="gyro_lowpass", axs=axs_gyro)
axs_gyro = draw_fft(imu, "notch_lp", item="gyro_notch_lowpass", axs=axs_gyro)

In [None]:
def write_imu_filtered(baginput, imu, start=0.0, acc_suffix="_notch", gyro_suffix="_lowpass"):
    ts = []
    acc = []
    gyro = []
    t0 = None
    #baginput is string, convert a output name
    bagoutput = baginput.replace(".bag", f"_filtered.bag")
    c = 0
    with rosbag.Bag(bagoutput, 'w') as outbag:
        for topic, msg, t in rosbag.Bag(baginput).read_messages():
            if msg._type == "sensor_msgs/Imu":
                if t0 is None:
                    t0 = msg.header.stamp.to_sec()
                if msg.header.stamp.to_sec() - t0 < start:
                    continue
                #Use filtered imu data to replace
                msg.linear_acceleration.x = imu["acc"+acc_suffix][c,0]
                msg.linear_acceleration.y = imu["acc"+acc_suffix][c,1]
                msg.linear_acceleration.z = imu["acc"+acc_suffix][c,2]
                msg.angular_velocity.x = imu["gyro"+gyro_suffix][c,0]
                msg.angular_velocity.y = imu["gyro"+gyro_suffix][c,1]
                msg.angular_velocity.z = imu["gyro"+gyro_suffix][c,2]
                c += 1
                outbag.write(topic, msg, t)
            else:
                outbag.write(topic, msg, t)
write_imu_filtered(bagname, imu, start=0.0, acc_suffix="_notch_lowpass", gyro_suffix="_notch_lowpass")