In [1]:
import time
import json
import pickle
import numpy as np
from classes_raki import *
# import kaleido
import plotly.express as px
from datetime import datetime
import plotly.graph_objects as go

In [2]:
# import plotly.io as pio

# pio.get_chrome()

In [3]:
with open("sensor_labels_raki.pkl", "rb") as f:
    labels = pickle.load(f)

with open("interpolation_functions_raki.pkl", "rb") as f:
    interp_funcs = pickle.load(f)

In [4]:
labels

{'mat_0': [{'start': np.int64(2972766),
   'end': np.int64(4172628),
   'label': 5,
   'target': 15},
  {'start': np.int64(5972287),
   'end': np.int64(7172081),
   'label': 4,
   'target': 10},
  {'start': np.int64(8972024),
   'end': np.int64(10171736),
   'label': 3,
   'target': 7},
  {'start': np.int64(11971687),
   'end': np.int64(13171400),
   'label': 2,
   'target': 3},
  {'start': np.int64(14971105),
   'end': np.int64(16171005),
   'label': 1,
   'target': 0}],
 'mat_1': [{'start': np.int64(2914212),
   'end': np.int64(4114274),
   'label': 0,
   'target': None},
  {'start': np.int64(5913610),
   'end': np.int64(7113418),
   'label': 0,
   'target': None},
  {'start': np.int64(8913546),
   'end': np.int64(10113441),
   'label': 0,
   'target': None},
  {'start': np.int64(11912852),
   'end': np.int64(13112752),
   'label': 0,
   'target': None},
  {'start': np.int64(14912864),
   'end': np.int64(16112480),
   'label': 0,
   'target': None}]}

In [5]:
class Sensor:
    def __init__(self, matrix, sensor_idx, interp_funcs, labels):
        self.interp_funcs = interp_funcs[f"mat_{matrix}"][sensor_idx]
        self.labels = labels[f"mat_{matrix}"]
        self.num_s = 100

    def get_data_for_region(self, region_idx):
        region = self.labels[region_idx]
        Y = []
        t = np.linspace(region["start"], region["end"], self.num_s)
        for hp in range(10):
            y = np.log(self.interp_funcs[hp](t))
            Y.append(y)
        return {
            "y": np.array(Y),  # sensor data
            "label": region["label"],  # clf label
            "reg": region["target"],  # regression target
            "t": t,  # time
        }

    def get_ml_data_for_region(self, region_idx):
        region_data = self.get_data_for_region(region_idx)
        X = region_data["y"]
        y = np.array([region_data["label"]] * self.num_s, dtype=np.int32)
        r = np.array([region_data["reg"]] * self.num_s)
        t = region_data["t"]
        return X.T, y, r, t

    def get_ml_data_list(self):
        X = np.array([[]] * 10)
        y = np.array([], dtype=np.int32)
        r = np.array([])
        t = np.array([])
        for i in range(len(self.labels)):
            region_data = self.get_data_for_region(i)
            X = np.append(X, region_data["y"], axis=1)
            y = np.append(y, np.array([region_data["label"]] * self.num_s))
            r = np.append(r, np.array([region_data["reg"]] * self.num_s))
            t = np.append(t, region_data["t"])
        return X.T, y, r, t  # sensor data, clf label, reg target, time
    
    def get_data_between_times(self, start, end, num_samples):
        Y = []
        t = np.linspace(start, end, num_samples)
        for hp in range(10):
            y = np.log(self.interp_funcs[hp](t))
            Y.append(y)
        
        return np.array(Y), t


def get_sensor_tuple_data(matrix, s_l_idx, s_r_idx, interp_funcs, labels):
    """ 
    Returns: Sensor data, Clf labels, Reg data, Time arr
    """
    s_l = Sensor(matrix, s_l_idx, interp_funcs, labels)
    X_l, y_l, r_l, t_l = s_l.get_ml_data_list()
    s_r = Sensor(matrix, s_r_idx, interp_funcs, labels)
    X_r, y_r, r_r, t_r = s_r.get_ml_data_list()

    if not (y_l == y_r).all():
        raise Exception(f"Classes are not the same!")
    if not (r_l == r_r).all():
        raise Exception(f"Reg targets are not the same!")
    if not (t_l == t_r).all():
        raise Exception(f"Time data are not the same!")

    X_tuple = np.concatenate((X_l, X_r), axis=1)
    return X_tuple, y_l, r_l, t_l  # Sensor data, Clf labels, Reg data, Time arr


def get_sensor_tuple_data_for_region(matrix, s_l_idx, s_r_idx, interp_funcs, labels, region_idx):
    s_l = Sensor(matrix, s_l_idx, interp_funcs, labels)
    X_l, y_l, r_l, t_l = s_l.get_ml_data_for_region(region_idx)
    s_r = Sensor(matrix, s_r_idx, interp_funcs, labels)
    X_r, y_r, r_r, t_r = s_r.get_ml_data_for_region(region_idx)

    if not (y_l == y_r).all():
        raise Exception(f"Classes are not the same!")
    if not (r_l == r_r).all():
        raise Exception(f"Reg targets are not the same!")
    if not (t_l == t_r).all():
        raise Exception(f"Time data are not the same!")

    X_tuple = np.concatenate((X_l, X_r), axis=1)
    return X_tuple, y_l, r_l, t_l  # Sensor data, Clf labels, Reg data, Time arr


In [6]:
def plot_data_arr(mat, y, t, labels, title, save_as=None):
    title = title
    fig = go.Figure()

    for i in range(10):
        fig.add_trace(go.Scatter(x=t,
                                 y=y[i],
                                 mode="lines",
                                 showlegend=False,
                                 name=f"H.S. {i}"))

    for label in labels[f"mat_{mat}"]:
        fig.add_vrect(x0=label["start"],
                      x1=label["end"],
                      annotation_text=classes[label["label"]],
                      annotation_position="top left",
                      fillcolor=colors[label["label"]],
                      opacity=0.4,
                      line_width=0)
    fig.update_yaxes(title_text="Resistance (Ohms)")
    fig.update_xaxes(title_text="Time since power on (ms)", tickformat=".0e")
    fig.update_layout(height=300, width=625,
                      title_x=0.5,
                      font_family="Times New Roman",
                      font_color="black",
                      title_font_family="Times New Roman",
                      title=dict(text=title, pad=dict(t=0, r=0, b=0, l=0)),
                      margin=dict(t=50, r=10, b=0, l=0))

    if save_as is not None:
        fig.write_image(f"figures/{save_as}")

    fig.show()

In [7]:
s_idx = 7
ns = 1200
s_l = Sensor(0, s_idx, interp_funcs, labels)
y_l, t_l = s_l.get_data_between_times(2372627, 17071312, ns)
s_r = Sensor(1, s_idx, interp_funcs, labels)
y_r, t_r = s_r.get_data_between_times(2372627, 17071312, ns)
plot_data_arr(0, y_l, t_l, labels, f"Matrix 0, Sensor {s_idx}", save_as=f"rki_mat_0_s_{s_idx}_w_labels.svg")
plot_data_arr(1, y_r, t_r, labels, f"Matrix 1, Sensor {s_idx}", save_as=f"rki_mat_1_s_{s_idx}_w_labels.svg")
plot_data_arr(0, y_l - y_r, t_l, labels, f"Sensor {s_idx} Net", save_as=f"rki_mat_0_min_1_s_{s_idx}_w_labels.svg")