In [1]:
from IPython.display import display
import ipywidgets as widgets
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import tensorflow as tf
from sklearn.metrics import r2_score, mean_squared_error

In [2]:
# load ds
train_ts_ds = np.load('data/train_ts_ds.npy',)
val_ts_ds = np.load('data/val_ts_ds.npy', )
test_ts_ds = np.load('data/test_ts_ds.npy', )
train_gaf_ds = np.load('data/train_gaf_ds.npy', )
val_gaf_ds = np.load('data/val_gaf_ds.npy', )
test_gaf_ds = np.load('data/test_gaf_ds.npy', )
train_label = np.load('data/train_label.npy', )
val_label = np.load('data/val_label.npy', )
test_label = np.load('data/test_label.npy', )
# load decoded ds
beta = 1
train_dec_gaf_ds = np.load(f'data/dec_imgs_train_beta_{beta}.npy', )
val_dec_gaf_ds = np.load(f'data/dec_imgs_val_beta_{beta}.npy', )
test_dec_gaf_ds = np.load(f'data/dec_imgs_test_beta_{beta}.npy', )


In [3]:
## dec imgs to ts igasf does not work yet
# class IGASF(tf.keras.layers.Layer):
#     def __init__(self, gasf_post_scale = True, min_scale = 0, max_scale = 1):
#         super(IGASF, self).__init__()
#         self.gasf_post_scale = gasf_post_scale
#         self.min_scale = min_scale
#         self.max_scale = max_scale

#     def call(self, inputs):
#         diag = tf.linalg.diag_part(inputs)
#         if self.gasf_post_scale:
#             ts = tf.sqrt(diag) 
#         else:
#             ts = tf.sqrt((diag + 1)/2) 
#         #ts = ts #* (self.max_scale - self.min_scale) + self.min_scale
#         return ts

# inputs = tf.keras.Input(shape=(28,28,1),batch_size=None,dtype=tf.float32)
# outputs = IGASF()(inputs)
# postprocessor_model = tf.keras.models.Model(inputs = inputs, outputs = outputs)
# train_pred_ts = np.squeeze(postprocessor_model.predict(tf.data.Dataset.from_tensor_slices(train_dec_gaf_ds).batch(128)))
# val_pred_ts = np.squeeze(postprocessor_model.predict(tf.data.Dataset.from_tensor_slices(val_dec_gaf_ds).batch(128)))
# test_pred_ts = np.squeeze(postprocessor_model.predict(tf.data.Dataset.from_tensor_slices(test_dec_gaf_ds).batch(128)))

# use numpy functions instead of igasf layer
train_pred_ts = []
for _, val in enumerate(np.squeeze(train_dec_gaf_ds)):
    train_pred_ts.append(np.sqrt(np.diag(val)))
train_pred_ts = np.array(train_pred_ts)    

val_pred_ts = []
for _, val in enumerate(np.squeeze(val_dec_gaf_ds)):
    val_pred_ts.append(np.sqrt(np.diag(val)))
val_pred_ts = np.array(val_pred_ts)  

test_pred_ts = []
for _, val in enumerate(np.squeeze(test_dec_gaf_ds)):
    test_pred_ts.append(np.sqrt(np.diag(val)))
test_pred_ts = np.array(test_pred_ts)  

In [4]:
# calculate ts losses
train_ts_loss = np.zeros((train_ts_ds.shape[0],2))
val_ts_loss = np.zeros((val_ts_ds.shape[0],2))
test_ts_loss = np.zeros((test_ts_ds.shape[0],2))
for i, (true, pred) in enumerate(zip(train_ts_ds, train_pred_ts)):
    train_ts_loss[i,0] = r2_score(true, pred)
    train_ts_loss[i,1] = mean_squared_error(true, pred)
for i, (true, pred) in enumerate(zip(val_ts_ds, val_pred_ts)):
    val_ts_loss[i,0] = r2_score(true, pred)
    val_ts_loss[i,1] = mean_squared_error(true, pred)
for i, (true, pred) in enumerate(zip(test_ts_ds, test_pred_ts)):
    test_ts_loss[i,0] = r2_score(true, pred)
    test_ts_loss[i,1] = mean_squared_error(true, pred)

In [5]:
# init figures
ts_len = train_ts_ds.shape[1]
line_fig = px.line(y=[None] * ts_len, range_y = [0,1])
line_fig.add_scatter(x=np.arange(ts_len), y = [None] * ts_len, mode = 'lines')
line_fig.update(layout_showlegend=False)
line_fig.update_layout(title = 'True TS (Blue) Vs Predicted TS (Red)')
ts_widget = go.FigureWidget(line_fig)
gaf_widget = go.FigureWidget(px.imshow(np.zeros((ts_len,ts_len)), zmin = 0, zmax= 1, color_continuous_scale = 'gray'))

In [6]:
# init dataframe with features and losses
label_df = pd.DataFrame(columns = ['step_start', 'step_end', 'step_width', 'step_amp', 'mse', 'r2_score'])

In [7]:
# dropdown menu for ds selection
ds_split_dd = widgets.Dropdown(
    options = [('Training', 0), ('Validation', 1), ('Test', 2)],
    value = 0,
    description='Split:'
)

In [8]:
# init df out for df visualization
df_out = widgets.Output()
with df_out:
    display(label_df)

In [9]:
# slider for ds indices
data_slider = widgets.IntSlider(
    value = 0,
    min = 0,
    max = train_label.shape[1] - 1,
    step = 1,
    description = 'Sample:'
)

In [10]:
# slider over ds indices. changes data within figures
def data_slider_change(change):
        with df_out:
            display(label_df[data_slider.value:data_slider.value +1], clear=True)
            
        ts_widget.data[0].y = ts_data[data_slider.value, :]
        ts_widget.data[1].y = pred_ts_data[data_slider.value, :]
        gaf_widget.data[0].z = gaf_data[data_slider.value,:,:,0]

In [11]:
data_slider.observe(data_slider_change)

In [12]:
# drop down for data set selection
def ds_split_dd_change(change):
    global ts_data, gaf_data, label_data, label_df, df_out, pred_ts_data
    if ds_split_dd.value == 0:
        ts_data = train_ts_ds
        pred_ts_data = train_pred_ts
        gaf_data = train_dec_gaf_ds
        label_data = np.concatenate((train_label.T, train_ts_loss),axis = -1)
    if ds_split_dd.value == 1:
        ts_data = val_ts_ds
        pred_ts_data = val_pred_ts
        gaf_data = val_dec_gaf_ds
        label_data = np.concatenate((val_label.T, val_ts_loss),axis = -1)
    if ds_split_dd.value == 2:
        ts_data = test_ts_ds
        pred_ts_data = test_pred_ts
        gaf_data = test_dec_gaf_ds
        label_data = np.concatenate((test_label.T, test_ts_loss),axis = -1)
        
    label_df = label_df[0:0]
    label_df = label_df.append(pd.DataFrame(label_data, columns = label_df.columns))
    data_slider.value = 0
    data_slider.max = len(label_df) -1
    with df_out:
        display(label_df, clear=True)
        

In [13]:
# init default data via drop down call
ds_split_dd_change(None)


The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.



In [14]:
# init figure data via data slider call
data_slider_change(None)

In [15]:
ds_split_dd.observe(ds_split_dd_change)

In [16]:
fig_box = widgets.HBox()

In [17]:
# radio buttons to choose between ts, gaf or ts and gaf figure presentation
fig_box_childs_rb = widgets.RadioButtons(
    options = [('TS', 0), ('GAF', 1), ('TS & GAF', 2)],
    value = 0,
    description='Figure:',
)

In [18]:
# set child of fig box with respect to chosen radiobutton
def fig_box_childs_rb_change(change):
    global fig_box
    if fig_box_childs_rb.value == 0:
        fig_box.children = [ts_widget]
    if fig_box_childs_rb.value == 1:
        fig_box.children = [gaf_widget]
    if fig_box_childs_rb.value == 2:
        fig_box.children = [ts_widget, gaf_widget]

In [19]:
fig_box_childs_rb_change(None)

In [20]:
fig_box_childs_rb.observe(fig_box_childs_rb_change)

In [21]:
widgets.VBox([widgets.HBox([widgets.VBox([ds_split_dd, data_slider]), fig_box_childs_rb]), widgets.HBox([fig_box, df_out])])

VBox(children=(HBox(children=(VBox(children=(Dropdown(description='Split:', options=(('Training', 0), ('Valida…

In [22]:
# table fixen (und nur eine zeile bei ausgewähltem sample anzeigen)!!!!!!! sehr wichtig vor korrelation bestimmen
# in eigene notebooks:
# visualization of predicting time series  (test sample vis -> prediction decoder vis) (ipywidgets) + label daneben anzeigen (eine zeile aus der tabelle)
# visualisierung latent space (siehe colab) und andere vis-colab notebooks(ipywidgets)
# korrelation visualisieren
# r2 score auf zeitreihe 
# notebooks dokumentation (colab)
# danach:
# cyclic beta -> statt callback einfach modell öfter trainieren mit unterschiedlichen beta
#  ggf reconstruction error erstmal vernachlässigen: hier die frage -> kann das modell gut clustern (gutes disentanglement)