<a href="https://colab.research.google.com/github/dvillasxUdg/ti_proyecto_final/blob/main/Proyecto_Final_Dashboard.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Module and Init values

In [None]:
!pip install pybaselines

Collecting pybaselines
  Downloading pybaselines-1.0.0-py3-none-any.whl (140 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/140.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m140.6/140.6 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pybaselines
Successfully installed pybaselines-1.0.0


In [None]:
import json
import numpy as np
import panel as pn
import plotly.graph_objs as go
from plotly.subplots import make_subplots
from pybaselines import Baseline, utils

import panel as pn
import panel.widgets as pnw
pn.extension('plotly')

from plotly.graph_objects import YBins
from numpy.core.multiarray import asanyarray
from scipy import signal


In [None]:
with open('average_coherence_data.json', 'r') as json_file:
    loaded_coherence_data = json.load(json_file)

with open('process_data_origin.json', 'r') as json_file:
    origin_data = json.load(json_file)

In [8]:
participants = [
    'ADJRR', 'CESARMDG', 'EMR', 'HMM', 'JAVI', 'JSDR', 'LESR', 'MEGA', 'VMAP',
    'ABN', 'AGM', 'AJJDT', 'AMCJ', 'AMGP', 'IGNR', 'MFGM', 'MJLS', 'NYHG', 'SDAMG'
]
WORD_6 = '6'
WORD_7 = '7'
BAND = 'delta'
CHANNELS = ['F3', 'F4', 'C3', 'C4', 'P3', 'P4', 'Fz', 'Cz', 'Pz']

# Widgets
main_selector = pn.widgets.RadioButtonGroup(name='Type', options=[ 'Señales','Metricas' ])
participants_selector = pn.widgets.Select(name='Participante', options=participants)

# Metrics
metric_selector = pn.widgets.Select(name='Metrica', options=['Coherencia', 'Histograma'])

# Signal
word_selector = pn.widgets.Select(name='Palabra', options=[WORD_6, WORD_7])

# Functions

In [12]:
# Coherence Function
def calculate_coherence(matrix, freq_band_range, fs=250):
    cohere_matrix = []

    for i in range(len(matrix)):
        cohere_row = []
        for j in range(len(matrix)):
            if i == j:
                cxy = np.full(len(matrix[i]), np.nan)
            else:
                _, cxy = signal.coherence(matrix[i] - np.mean(matrix[i]), matrix[j] - np.mean(matrix[j]),
                                          fs=fs, window='hann', nperseg=25, noverlap=12, nfft=fs)

            num_appears_freqs = 0
            add_freqs = 0
            for k in range(len(cxy)):
                if freq_band_range[0] <= k <= freq_band_range[1]:
                    num_appears_freqs += 1
                    add_freqs += cxy[k]
            cohere_row.append(add_freqs / num_appears_freqs)
        cohere_matrix.append(cohere_row)

    cohere_matrix = np.asarray(cohere_matrix)
    return cohere_matrix

# Plot Coherences values
def plot_coherence(coherence_matrix):
    coherence_matrix = coherence_matrix[::-1]

    trace = go.Heatmap(z=coherence_matrix,
                       x=CHANNELS,
                       y=CHANNELS[::-1],
                       colorscale='Viridis', zmin=0, zmax=1)

    layout = go.Layout(
        xaxis=dict(showline=True, showgrid=False, tickvals=list(range(len(CHANNELS))), ticktext=CHANNELS),
        yaxis=dict(showline=True, showgrid=False, tickvals=list(range(len(CHANNELS))), ticktext=CHANNELS[::-1]),
        autosize=False,
        width=400,
        height=400,
    )

    fig = go.Figure(data=[trace], layout=layout)
    return fig

def plot_histo(x):
  fig = go.Figure()
  fig.add_trace(go.Histogram(
      x=x,
      showlegend = False,
      xbins=dict(
        size=0.1
    )))
  return fig

# Plot Signal
def plot_2d_eeg(matrix,show):
  fig = go.Figure()
  x = np.linspace(0, 700, 175)

  for i in range(len(matrix)):
    fig.add_trace(go.Scatter(
        x = x,
        y = matrix[i],
        marker = dict(size=1),
        line = dict(width=1),
        mode = 'lines',
        name = CHANNELS[i],
        showlegend = show
    ))

  return fig

# Average
def make_average(data):
  x = data.sum(axis=0)/34
  return x

# Base Line
def make_base_line(matrix_array):
  matrix_basel_line = []
  x = np.linspace(0, 700, 175)

  for matrix in matrix_array:
    word_matrix_base_line = []

    for i in range(len(matrix)):
      baseline_fitter = Baseline(x_data=x)
      base_line = baseline_fitter.fabc(matrix[i])
      array_base_line = np.asarray(base_line[0])
      word_matrix_base_line.append(np.subtract(matrix[i], array_base_line))

  matrix_basel_line.append(word_matrix_base_line)

  return np.asanyarray(matrix_basel_line)

# Show Metrics Panel
def get_metrics_panels(metric,participant_name):
    coherence_6 = loaded_coherence_data[participant_name][WORD_6][BAND]
    coherence_7 = loaded_coherence_data[participant_name][WORD_7][BAND]
    if metric == 'Coherencia':
      fig_cohe_6 = plot_coherence(coherence_6)
      fig_cohe_7 = plot_coherence(coherence_7)
    else:
      cohe_values_6 = np.asarray(coherence_6).reshape(-1)
      cohe_values_7 = np.asarray(coherence_7).reshape(-1)
      fig_cohe_6 = plot_histo(cohe_values_6)
      fig_cohe_7 = plot_histo(cohe_values_7)


    fig_combined = make_subplots(rows=1, cols=2, subplot_titles=[f'Palabra Congruente #{WORD_6}', f'Palabra Incongruente #{WORD_7}'])
    for trace in fig_cohe_6.data:
      fig_combined.add_trace(trace, row=1, col=1)
    for trace in fig_cohe_7.data:
      fig_combined.add_trace(trace, row=1, col=2)


    if metric != 'Coherencia':
      fig_combined.update_layout(
        xaxis_title_text='Valor',
        yaxis_title_text='Cuenta',
        )
    return fig_combined

# Show Signal Panels
def get_signal_panels(participant_name, word):
    matrix_avg = make_average(np.asanyarray(origin_data[participant_name][word]))
    fig_signal = plot_2d_eeg(matrix_avg,True)

    matrix_base_line = make_base_line(np.asanyarray(origin_data[participant_name][word]))
    matrix_avg = make_average(matrix_base_line)
    fig_base = plot_2d_eeg(matrix_avg, False)

    fig_combined = make_subplots(rows=1, cols=2, subplot_titles=[f'Origen', f'Linea Base'])
    for trace in fig_signal.data:
      fig_combined.add_trace(trace, row=1, col=1)
    for trace in fig_base.data:
      fig_combined.add_trace(trace, row=1, col=2)

    fig_combined.update_layout(
        xaxis_title_text='Tiempo (Milisegundos)',
        yaxis_title_text='Microvoltios',
        )
    return fig_combined


# Update panels
def update_panels(event):
    # Widgets values
    metric = metric_selector.value
    participant_name = participants_selector.value
    word = word_selector.value
    main_type = main_selector.value

    if main_type == 'Metricas':
      metric_selector.disabled = False
      word_selector.disabled = True
      fig_combined = get_metrics_panels(metric, participant_name)
    else:
      word_selector.disabled = False
      metric_selector.disabled = True
      fig_combined = get_signal_panels(participant_name, word)

    panel.object = fig_combined



# Run code

In [13]:
# Watcher
main_selector.param.watch(update_panels, 'value')
participants_selector.param.watch(update_panels, 'value')
metric_selector.param.watch(update_panels, 'value')
word_selector.param.watch(update_panels, 'value')


# Init values
participant_name = participants_selector.value
word = word_selector.value

metric_selector.disabled = True

initial_fig_combined = get_signal_panels(participant_name, word)

panel = pn.pane.Plotly(initial_fig_combined)

# Set dashboard
layout = pn.Row(
    pn.Column(
        main_selector,
        participants_selector,
        word_selector,
        metric_selector,
    ),
    pn.Column(
        panel
    )
)

layout.servable()
