In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import io
from collections import Counter
from itertools import groupby
import ipywidgets as widgets
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import pairwise_distances
from nolitsa import delay, dimension, noise, utils

gait_data = pd.read_csv("gait.csv")
lag_slider = widgets.IntSlider(value=50, min=10, max=100, step=1, description='Max Lag:')
dim_slider = widgets.IntSlider(value=10, min=1, max=20, step=1, description='Max Dim:')
rp_threshold_slider = widgets.FloatSlider(value=0.25, min=0.1, max=1.0, step=0.01, description='RP Threshold:')
fnn_threshold_slider = widgets.FloatSlider(value=0.10, min=0.01, max=0.2, step=0.01, description='FNN Threshold:')
subject_dropdown = widgets.Dropdown(options=[], description='Subject:')
leg_dropdown = widgets.Dropdown(options=[], description='Leg:')
joint_dropdown = widgets.Dropdown(options=[], description='Joint:')
condition_dropdown = widgets.Dropdown(options=[], description='Condition:')

def create_dropdown_widgets():
    subjects = sorted(gait_data['subject'].unique())
    legs = sorted(gait_data['leg'].unique())
    joints = sorted(gait_data['joint'].unique())
    conditions = sorted(gait_data['condition'].unique())
    
    leg_desc = {1: 'Left', 2: 'Right'}
    joint_desc = {1: 'Ankle', 2: 'Knee', 3: 'Hip'}
    condition_desc = {1: 'Unbraced', 2: 'Knee Brace', 3: 'Ankle Brace'}

    leg_options = [(leg_desc[leg], leg) for leg in legs]
    joint_options = [(joint_desc[joint], joint) for joint in joints]
    condition_options = [(condition_desc[condition], condition) for condition in conditions]

    subject_dropdown.options = subjects
    leg_dropdown.options = leg_options
    joint_dropdown.options = joint_options
    condition_dropdown.options = condition_options
    
    display(subject_dropdown, leg_dropdown, joint_dropdown, condition_dropdown)

def load_and_preprocess_data(gait_data, subject, leg, joint, condition):
    
    data_filtered = gait_data[
        (gait_data['subject'] == subject) & 
        (gait_data['leg'] == leg) &
        (gait_data['joint'] == joint) &
        (gait_data['condition'] == condition)]
    
    data_filtered = data_filtered[['angle']].dropna()
    scaler = MinMaxScaler()
    data_normalized = pd.DataFrame(scaler.fit_transform(data_filtered), columns=['angle'])
    
    return data_normalized

def localmin(variable):
    return (np.diff(np.sign(np.diff(variable))) > 0).nonzero()[0] + 1

def binscalc(data):
    return int(np.ceil(np.sqrt(len(data))))

def calculate_mi(data, max_lag):
    bin_size = binscalc(data)
    mi_values = delay.dmi(data, maxtau=max_lag, bins=bin_size)
    return noise.sma(mi_values, hwin=1)

def find_first_local_min(smoothed_mi):
    local_minima_indices = localmin(smoothed_mi)
    return (local_minima_indices + 1)[0] if local_minima_indices.size > 0 else None

def estimate_optimal_ac_delay(data):
    return np.argmax(delay.acorr(data) < 1.0 / np.e)

def estimate_optimal_delay(data, max_lag):
    smoothed_mi = calculate_mi(data, max_lag)
    optimal_mi_delay = find_first_local_min(smoothed_mi)
    optimal_ac_delay = estimate_optimal_ac_delay(data)
    return optimal_mi_delay, optimal_ac_delay, smoothed_mi

def estimate_embedding_dimension(data, optimal_mi_delay, optimal_ac_delay, max_dim, fnn_threshold):
    dimensions = np.arange(1, max_dim + 1)
    F1, F2, F3 = dimension.fnn(data, tau=optimal_mi_delay, dim=dimensions, window=optimal_ac_delay, metric='euclidean')
    embedding_dimension = next((d for d, f in zip(dimensions, F1) if f < fnn_threshold), None)
    return embedding_dimension, dimensions, F1

def phase_space_reconstruction(data, embedding_dimension, optimal_mi_delay):
    reconstructed_vector = utils.reconstruct(data[:, 0], embedding_dimension, int(optimal_mi_delay))
    return reconstructed_vector

def plot_mi(mi_values, results_folder):
    fig, ax = plt.subplots(figsize=(10, 5))
    ax.plot(mi_values, label='MI')
    ax.set_title('Mutual Information')
    ax.set_xlabel('Lag')
    ax.set_ylabel('MI')
    ax.legend()
    save_plot(results_folder, 'mutual_information_plot.png')
    plt.show()

def plot_fnn(fnn_values, dimensions, results_folder):
    fig, ax = plt.subplots(figsize=(10, 5))
    ax.plot(dimensions, fnn_values, label='FNN')
    ax.set_title('False Nearest Neighbors')
    ax.set_xlabel('Embedding Dimension')
    ax.set_ylabel('FNN')
    ax.legend()
    save_plot(results_folder, 'fnn_plot.png')
    plt.show()

def plot_phase_space(reconstructed, embedding_dimension, results_folder):
    fig = plt.figure(figsize=(20, 10))

    ax1 = fig.add_subplot(121)
    ax1.plot(reconstructed[:, 0], reconstructed[:, 1], color='blue')
    ax1.set_title('2D Phase Space Reconstruction')
    ax1.set_xlabel('x(t)')
    ax1.set_ylabel('x(t + τ)')
    
    if embedding_dimension >= 3:
        ax2 = fig.add_subplot(122, projection='3d')
        ax2.plot(reconstructed[:, 0], reconstructed[:, 1], reconstructed[:, 2], color='red')
        ax2.set_title('3D Phase Space Reconstruction')
        ax2.set_xlabel('x(t)')
        ax2.set_ylabel('x(t + τ)')
        ax2.set_zlabel('x(t + 2τ)')

    save_plot(results_folder, 'phase_space_plot.png')
    plt.show()

def plot_recurrence_plot(rp_matrix, results_folder):
    plt.figure(figsize=(10, 10))
    plt.imshow(rp_matrix, cmap='binary', origin='lower')
    plt.title('Recurrence Plot')
    plt.xlabel('Time')
    plt.ylabel('Time')
    save_plot(results_folder, 'recurrence_plot.png')
    plt.show()

def save_plot(folder, filename):
    if not os.path.exists(folder):
        os.makedirs(folder)
    plt.savefig(os.path.join(folder, filename))

def compute_recurrence_plot(data, rp_threshold):
    distances = pairwise_distances(data)
    return distances < rp_threshold

def compute_histograms(M, n):
    diag_hist = np.zeros(n+1)
    vert_hist = np.zeros(n+1)

    for i in range(n):
        diag_counter = 0
        vert_counter = 0
        for j in range(n-i):
            if M[i+j][j] == 1:
                diag_counter += 1
            else:
                if diag_counter > 0:
                    diag_hist[diag_counter] += 2 
                    diag_counter = 0

            if M[j][i] == 1:
                vert_counter += 1
            else:
                if vert_counter > 0:
                    vert_hist[vert_counter] += 1
                    vert_counter = 0

        if diag_counter > 0:
            diag_hist[diag_counter] += 2
        if vert_counter > 0:
            vert_hist[vert_counter] += 1

    diag_hist[n] /= 2

    return diag_hist, vert_hist

def compute_average(hst, min_line_length, n):
    numerator = sum(i * hst[i] for i in range(min_line_length, n+1))
    denominator = sum(hst[i] for i in range(min_line_length, n+1))
    return numerator / (denominator if denominator > 0 else 1)

def compute_entropy(hst, min_line_length, n):
    total = sum(hst[min_line_length:n+1])
    if total > 0:
        return -sum((hst[i]/total) * np.log(hst[i]/total) for i in range(min_line_length, n+1) if hst[i] != 0)
    else:
        return 0

def compute_max_line_length(hst, n):
    return max((i for i in range(1, n+1) if hst[i] != 0), default=1)

def compute_rqa_measures(rp_matrix, min_line_length=2):
    n = rp_matrix.shape[0]

    diag_hist, vert_hist = compute_histograms(rp_matrix, n)

    rec = np.sum(rp_matrix) / n**2 * 100
    mean_l = compute_average(diag_hist, min_line_length, n)
    entr_l = compute_entropy(diag_hist, min_line_length, n)
    mean_v = compute_average(vert_hist, min_line_length, n)
    max_v = compute_max_line_length(vert_hist, n)
    entr_v = compute_entropy(vert_hist, min_line_length, n)

    rqa_measures =  {
        'Length of Time Series': n,
        'Percentage of Recurrent Points (%REC)': rec,
        'Avarage Diagonal Line Length (MeanL)': mean_l,
        'Diagonal Line Entropy (EntrL)': entr_l,
        'Avarage Vertical Line Length (MeanV)': mean_v,
        'Max Vertical Line Length (MaxV)': max_v,
        'Vertical Line Entropy (EntrV)': entr_v,
    }

    return rqa_measures

def execute_analysis(btn):
    MAX_LAG = lag_slider.value
    MAX_DIM = dim_slider.value
    RP_THRESHOLD = rp_threshold_slider.value
    FNN_THRESHOLD = fnn_threshold_slider.value

    print(f"Max Lag: {MAX_LAG}")
    print(f"Max Dim: {MAX_DIM}")
    print(f"RP Threshold: {RP_THRESHOLD}")
    print(f"FNN Threshold: {FNN_THRESHOLD}")

    folder_name = f"subject_{subject_dropdown.value}_leg_{leg_dropdown.value}_joint_{joint_dropdown.value}_condition_{condition_dropdown.value}"
    results_folder = os.path.join('gait_results', folder_name)
    data_normalized = load_and_preprocess_data(gait_data, subject_dropdown.value, leg_dropdown.value, joint_dropdown.value, condition_dropdown.value)
    optimal_mi_delay, optimal_ac_delay, mi_values = estimate_optimal_delay(data_normalized.values.flatten(), MAX_LAG)

    embedding_dimension, _, fnn_values = estimate_embedding_dimension(
        data_normalized.values.flatten(), optimal_mi_delay, optimal_ac_delay, MAX_DIM, FNN_THRESHOLD
    )
    print(f"Optimal Delay: {optimal_mi_delay}")
    print(f"Embedding Dimension: {embedding_dimension}")

    reconstructed = phase_space_reconstruction(data_normalized.values, embedding_dimension, optimal_mi_delay)

    rp_matrix = compute_recurrence_plot(reconstructed, RP_THRESHOLD)
    rqa_measures = compute_rqa_measures(rp_matrix)
    
    print("\nRQA measures")
    for key, value in rqa_measures.items():
        if '(%REC)' in key:
            formatted_value = f"{value:.2f}%"
        else:
            formatted_value = round(value, 2)
        print(f"{key}: {formatted_value}")

    plot_mi(mi_values, results_folder)
    plot_fnn(fnn_values, np.arange(1, MAX_DIM + 1), results_folder)
    plot_phase_space(reconstructed, embedding_dimension, results_folder)
    plot_recurrence_plot(rp_matrix, results_folder)

    if not os.path.exists(results_folder):
        os.makedirs(results_folder)
    measures_df = pd.DataFrame.from_dict(rqa_measures, orient='index', columns=['Value'])
    measures_df.to_csv(os.path.join(results_folder, 'measures.csv'))
    
run_analysis_button = widgets.Button(description="Run Analysis")
run_analysis_button.on_click(execute_analysis)

create_dropdown_widgets()
display_widgets = [
    lag_slider,
    dim_slider,
    rp_threshold_slider,
    fnn_threshold_slider,
    run_analysis_button
]


for widget in display_widgets:
    display(widget)

Dropdown(description='Subject:', options=(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), value=None)

Dropdown(description='Leg:', options=(('Left', 1), ('Right', 2)), value=None)

Dropdown(description='Joint:', options=(('Ankle', 1), ('Knee', 2), ('Hip', 3)), value=None)

Dropdown(description='Condition:', options=(('Unbraced', 1), ('Knee Brace', 2), ('Ankle Brace', 3)), value=Non…

IntSlider(value=50, description='Max Lag:', min=10)

IntSlider(value=10, description='Max Dim:', max=20, min=1)

FloatSlider(value=0.25, description='RP Threshold:', max=1.0, min=0.1, step=0.01)

FloatSlider(value=0.1, description='FNN Threshold:', max=0.2, min=0.01, step=0.01)

Button(description='Run Analysis', style=ButtonStyle())