In [4]:
!pip install mcap-ros2-support mcap

zsh:1: /usr/local/bin/pip: bad interpreter: /usr/local/opt/python/bin/python3.6: no such file or directory


In [2]:
import math

In [3]:
from mcap_ros2.decoder import DecoderFactory
from mcap.reader import make_reader

magnetometer_data = []
printer_data = []
with open(
    "../data/rosbag2_2024_01_11-11_38_41/rosbag2_2024_01_11-11_38_41_0.mcap",
    "rb",
) as fd:
    reader = make_reader(fd, decoder_factories=[DecoderFactory()])
    for schema, channel, message, ros_msg in reader.iter_decoded_messages(
    ):
        # print(f"{channel.topic} {schema.name} [{message.log_time}]: ")
        if channel.topic == "/magnetometer_reading":
            for reading in ros_msg.magnetic_field_array:
                timestamp = reading.header.stamp.sec + reading.header.stamp.nanosec * 1e-9
                magnetic_field_x = reading.magnetic_field.x
                magnetic_field_y = reading.magnetic_field.y
                magnetic_field_z = reading.magnetic_field.z
                magnetometer_data.append([timestamp, magnetic_field_x, magnetic_field_y, magnetic_field_z])

        elif channel.topic == "/tf":
            for transform in ros_msg.transforms:
                if transform.header.frame_id == "printer_head_link":
                    timestamp = transform.header.stamp.sec + transform.header.stamp.nanosec * 1e-9
                    x = transform.transform.translation.x
                    y = transform.transform.translation.y
                    z = transform.transform.translation.z
                    printer_data.append([timestamp, x, y, z])

ModuleNotFoundError: No module named 'mcap_ros2'

In [None]:
from matplotlib import pyplot as plt

plt.plot([x[0] for x in magnetometer_data], [x[1] for x in magnetometer_data], label="x")
plt.plot([x[0] for x in magnetometer_data], [x[2] for x in magnetometer_data], label="y")
plt.plot([x[0] for x in magnetometer_data], [x[3] for x in magnetometer_data], label="z")
plt.set_xlabel("timestamp [s]")
plt.set_ylabel("Magnetic Sensor Reading")
plt.legend()
plt.grid()
plt.show()
plt.savefig("../reports/ros_integration/figures/raw_magnetometer.png")

: 

In [None]:
plt.plot([x[0] for x in printer_data], [x[1] for x in printer_data], label="x")
plt.plot([x[0] for x in printer_data], [x[2] for x in printer_data], label="y")
plt.plot([x[0] for x in printer_data], [x[3] for x in printer_data], label="z")
plt.set_xlabel("timestamp [s]")
plt.set_ylabel("mm")
plt.legend()
plt.grid()
plt.show()
plt.savefig("../reports/ros_integration/figures/3d_printer_raw.png")


: 

In [None]:
import numpy as np
from scipy.signal import butter,filtfilt

# Filter requirements.
T = 5.0         # Sample Period
fs = 850       # sample rate, Hz
cutoff = 2     # desired cutoff frequency of the filter, Hz ,      slightly higher than actual 1.2 Hz
nyq = 0.5 * fs  # Nyquist Frequency
order = 2       # sin wave can be approx represented as quadratic
# n = len(y_data) # total number of samples

def butter_lowpass_filter(data, cutoff, fs, order):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    # Get the filter coefficients 
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    # print(b.dtype, a.dtype)
    y = filtfilt(b, a, data)
    return y

: 

In [None]:
t_data = [x[0] for x in magnetometer_data]
x_data = [x[1] for x in magnetometer_data]
x_filtered = butter_lowpass_filter(x_data, cutoff, fs, order)

plt.plot(t_data, x_data, label="x_raw")
plt.plot(t_data, x_filtered, label="x_filtered")
plt.set_xlabel("timestamp [s]")
plt.set_ylabel("Magnetic Sensor Reading")
plt.show()
plt.savefig("../reports/ros_integration/figures/filtered_magnetometer.png")

: 

In [None]:
# get gradient of sensor data
import numpy as np

y_data = [float(x[2]) for x in magnetometer_data]
for i in range(len(y_data)):
    if y_data[i] > 55000:
        y_data[i] = y_data[i] - 2**16
y_filtered = butter_lowpass_filter(y_data, cutoff, fs, order)
y_grad = np.gradient(y_filtered)

plt.plot([x[0] for x in magnetometer_data], y_grad, label="y_grad")
plt.legend()
plt.grid()
plt.show()

: 

In [None]:
episodes = []
open_episode = True
episode_start = 0

t_data = [x[0] for x in magnetometer_data]

print(len(y_grad), len(t_data))
for idx, pt in enumerate(y_grad):
    if open_episode:
        if pt < -10:
            # found start of episode
            open_episode = False
            episode_start = t_data[idx]
        else:
            continue
    else:
        if pt > 10:
            # found end of episode
            open_episode = True
            episodes.append((episode_start, t_data[idx]))
        else:
            continue

: 

In [None]:
shaft_timestamp = [x[0] for x in printer_data]

BEAM_EDGE_Y_RELATIVE_TO_NOZZLE = 56.75
WHISKER_TIP_Y = 35  # from the base of the printer bed


def distance_on_shaft(y):
    return y - BEAM_EDGE_Y_RELATIVE_TO_NOZZLE - WHISKER_TIP_Y


shaft_data = [distance_on_shaft(x[2]) for x in printer_data]


shaft_data_zipped = list(zip(shaft_timestamp, shaft_data))
shaft_data_per_episode = []

episodes_data = []
episodes_grad_data = []
for episode_start, episode_end in episodes:
    episodes_data.append(
        [y for t, y in zip(t_data, y_data) if t >= episode_start and t <= episode_end]
    )
    shaft_data_per_episode.append(
        [
            x[1]
            for x in shaft_data_zipped
            if x[0] >= episode_start and x[0] <= episode_end
        ]
    )
    episodes_grad_data.append(
        [y for t, y in zip(t_data, y_grad) if t >= episode_start and t <= episode_end]
    )

shaft_distance_per_episode = [float(np.median(x)) for x in shaft_data_per_episode]

print(len(episodes_data))
fig, axes = plt.subplots(ncols=5, nrows=math.ceil(len(episodes) / 5))
for idx, data in enumerate(episodes_data):
    ax = axes[idx // 5, idx % 5]
    ax.plot(data)
    ax.title.set_text(f"D: {shaft_distance_per_episode[idx]:.2f}")


fig, axes = plt.subplots(ncols=5, nrows=math.ceil(len(episodes) / 5))
for idx, data in enumerate(episodes_grad_data):
    ax = axes[idx // 5, idx % 5]
    ax.plot(data)
    ax.title.set_text(f"D: {shaft_distance_per_episode[idx]:.2f}")


max_grad_per_episode = [np.min(x) for x in episodes_grad_data]
print(shaft_distance_per_episode)
print(max_grad_per_episode)

: 

In [None]:
print(len(shaft_distance_per_episode), len(max_grad_per_episode))

plt.plot([x for x in max_grad_per_episode], shaft_distance_per_episode, "x")
plt.grid()

: 

In [None]:
# curve-fit() function imported from scipy
from scipy.optimize import curve_fit

def whisker_model(x, a, b, c, d):
    return a*x*x*x + b*x*x + c*x + d
 
param, param_cov = curve_fit(whisker_model, max_grad_per_episode, shaft_distance_per_episode)
 
plt.plot([x for x in max_grad_per_episode], shaft_distance_per_episode, "x")
plt.grid()

ans = [whisker_model(x, param[0], param[1], param[2], param[3]) for x in sorted(max_grad_per_episode)]

plt.plot(sorted(max_grad_per_episode), ans, '--', color ='blue', label ="Polynomial Fit")
plt.set(xlabel='max_grad_per_episode', ylabel='shaft_distance_per_episode')
plt.legend()
plt.show()

: 

In [None]:
import pickle

with open("whisker_model.pkl", "wb") as fd:
    pickle.dump(param, fd)

: 

In [None]:
with open("whisker_model.pkl", "rb") as fd:
    print(pickle.load(fd))

: 

In [None]:
for idx, data in enumerate(episodes_data):
    print(len(data))

: 

: 