In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statistics
import matplotlib.ticker as ticker
import matplotlib.dates as mdates
import os
import sys
import json
from Packages.steps_analysis import *
from mplcursors import cursor

In [None]:
def csv_to_df(filename, index_col):
    df = pd.read_csv(filename, index_col=index_col, parse_dates=True)
    return df

In [None]:
def get_idxs_of_low_variance(a, var_ths, window_size):
    # a.shape=[n_cols,T], var_ths.shape=[n_cols] or float, window_size=[n_cols, window]
    a_windows = np.lib.stride_tricks.sliding_window_view(a,window_shape=window_size)
    vars_are_small = (np.where(a_windows.var(-1)<var_ths,1,0))[0]
    vars_are_small = np.where(vars_are_small.all(1)==1,1,0) # merge the column conditions
    d = np.diff(vars_are_small)
    starts = np.argwhere(d==1)[:,0]+1
    ends = np.argwhere(d==-1)[:,0]+window_size[-1]-1
    if starts[0]>ends[0]: starts = np.concatenate([np.array([0]),starts])
    if starts[-1]>ends[-1]: ends = np.concatenate([ends,np.array([a.shape[-1]-1])])
    step_idxs = [np.arange(s,e+1) for s,e in zip(starts,ends)]
    return starts, ends, step_idxs

In [None]:
def plot_f_with_selections(df, cols, selections, idxs_to_plot=None, x_is_time=False, invert_y=False, to_use_cursor=True):
    if idxs_to_plot is None: idxs_to_plot= np.arange(len(df))
    fig, ax = plt.subplots(figsize=(10,4))
    if x_is_time:
        x = df.index[idxs_to_plot]
        ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M, %d/%m'))
        plt.setp(ax.xaxis.get_majorticklabels(), rotation=-45, ha="left")
    else:
        x = np.arange(idxs_to_plot.shape[0])[idxs_to_plot]
    for col in cols:
        ax.plot(x, df[col].to_numpy()[idxs_to_plot])
    if invert_y: plt.gca().invert_yaxis()
    for i,selection in enumerate(selections):
        ax.axvspan(x[selection[0]], x[selection[1]], alpha=0.2, color='green')
        ax.text(x[selection[0]],ax.get_ylim()[1]*0.8+ax.get_ylim()[0]*0.2, f"{i}",fontsize=10, rotation=0)
    if to_use_cursor: cursor(hover=True)
    plt.show()

In [None]:
def suggest_steps_and_get_df_from_path(filename, step_cols, ths, window_size, invert_y=False, step_size=None, index_col='timestamp', idxs_to_plot=None):
    df = csv_to_df(filename, index_col=index_col, parse_dates=True)
    starts, ends, step_idxs = get_idxs_of_low_variance(df[step_cols].to_numpy().T, ths, window_size)
    starts = starts if step_size is None else [end-step_size for end in ends]
    print("suggested indices for steps:")
    selections = [[s,e] for s,e in zip(starts, ends)]
    for selection in selections:
        print(f"{selection},")
    plot_f_with_selections(df, step_cols, selections, idxs_to_plot=idxs_to_plot, x_is_time=False, invert_y=invert_y, to_use_cursor=True)
    return df