In [1]:
import pickle
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from scipy.signal import butter, filtfilt

In [2]:
CUTOFF = 5
FS = 50
ORDER = 1


def apply_lowpass_filter(data, cutoff, fs, order=5):
    b, a = butter(order, cutoff, fs=fs, btype="low", analog=False)
    y = filtfilt(b, a, data)
    return y

In [3]:
DAY = "day_9"
with open(f"data/{DAY}/interp.pkl", "rb") as f:
    sensor_data = pickle.load(f)

print(sensor_data.keys())
print(sensor_data["air"][0].keys())
print(type(sensor_data["air"][0]["t"]))
print(np.array_equal(sensor_data["air"][0]["t"],
                     sensor_data["air"][7]["t"]))
print(np.array_equal(sensor_data["sample"][0]["t"],
                     sensor_data["air"][7]["t"]))

dict_keys(['air', 'sample'])
dict_keys(['t', 'hps'])
<class 'numpy.ndarray'>
True
True


In [4]:
def get_pair_net(sensor_data, sensor_l, sensor_r, as_log=False):
    """ 
    sensor_l: 2, sensor_r: 3
    """
    air_l = np.array(sensor_data["air"][sensor_l]["hps"])
    air_r = np.array(sensor_data["air"][sensor_r]["hps"])
    samp_l = np.array(sensor_data["sample"][sensor_l]["hps"])
    samp_r = np.array(sensor_data["sample"][sensor_r]["hps"])

    air_l = apply_lowpass_filter(air_l, CUTOFF, FS, ORDER)
    air_r = apply_lowpass_filter(air_r, CUTOFF, FS, ORDER)
    samp_l = apply_lowpass_filter(samp_l, CUTOFF, FS, ORDER)
    samp_r = apply_lowpass_filter(samp_r, CUTOFF, FS, ORDER)

    if as_log:
        air_l = np.log(air_l)
        air_r = np.log(air_r)
        samp_l = np.log(samp_l)
        samp_r = np.log(samp_r)

    diff_l = samp_l - air_l
    diff_r = samp_r - air_r
    net = np.concatenate((diff_l, diff_r), axis=0)
    t = sensor_data["air"][sensor_l]["t"]
    return t, net

In [5]:
def get_daily_net_pair(days, sensor_l, sensor_r):
    ts = None
    nets = None
    for i, day in enumerate(days):
        DAY = f"day_{day}"
        with open(f"data/{DAY}/interp.pkl", "rb") as f:
            sensor_data = pickle.load(f)

        t, net = get_pair_net(sensor_data, sensor_l, sensor_r, as_log=True)
        t = t = np.linspace(1, 100, 100) + (i * 400)
        if ts is None:
            ts = t.copy()
            nets = net.copy()
        else:
            ts = np.concatenate((ts, t))
            nets = np.concatenate((nets, net), axis=1)

    return ts, nets

In [6]:
def plot_days_net(t, ys, sl, sr, save=False):
    title = f"Net Sensor {sl}, {sr} Readings"
    fig = go.Figure()

    for y in ys:
        fig.add_trace(go.Scatter(x=t,
                                 y=y,
                                 mode="markers",
                                 showlegend=False))

    fig.update_yaxes(title_text="Resistance (Ohms)")
    fig.update_xaxes(title_text="Timestamp")
    fig.update_layout(height=500, width=800,
                      title_x=0.5,
                      font_family="Times New Roman",
                      title_font_family="Times New Roman",
                      title=dict(text=title, pad=dict(t=0, r=0, b=0, l=0)),
                      margin=dict(t=50, r=10, b=0, l=0))
    fig.show()
    if save:
        fig.write_image(f"Net_{sl}_{sr}.pdf")

In [7]:
for s in range(4):
    sl, sr = s * 2, s * 2 + 1
    ts, nets = get_daily_net_pair([0, 1, 2, 3, 4, 5, 7, 9], sl, sr)
    plot_days_net(ts, nets, sl, sr, save=False)

In [8]:
def get_daily_net_pair_box(days, sensor_l, sensor_r):
    result = {"t": [], "y": []}
    for day in days:
        DAY = f"day_{day}"
        with open(f"data/{DAY}/interp.pkl", "rb") as f:
            sensor_data = pickle.load(f)

        t, net = get_pair_net(sensor_data, sensor_l, sensor_r, as_log=True)
        net = net.flatten()
        t = [f"Day {day}"] * len(net)
        result["t"].extend(t)
        result["y"].extend(list(net))
    return result

In [9]:
def plot_boxes(days, sl, sr, save=False):
    r = get_daily_net_pair_box(days, sl, sr)
    title = f"Net Sensor {sl}, {sr} Readings"
    fig = px.box(r, x="t", y="y", points="all")
    fig.update_yaxes(title_text="Resistance (Ohms)")
    fig.update_xaxes(title_text="Days")
    fig.update_layout(height=500, width=800,
                      title_x=0.5,
                      font_family="Times New Roman",
                      title_font_family="Times New Roman",
                      title=dict(text=title, pad=dict(t=0, r=0, b=0, l=0)),
                      margin=dict(t=50, r=10, b=0, l=0))
    fig.show()
    if save:
        fig.write_image(f"Box_{sl}_{sr}.pdf")

In [10]:
for s in range(4):
    sl, sr = s * 2, s * 2 + 1
    plot_boxes([0, 1, 2, 3, 4, 5, 7, 9], sl, sr, save=False)