# __Load raw data files__

### Setup

In [None]:
%load_ext watermark
%load_ext autoreload

In [None]:
%autoreload 2
%watermark -v -n -m -p numpy

In [None]:
import sys
sys.path.append('../' * 3)
from pathlib import Path
from eeg_web_assistant import settings

In [None]:
SEIZURE_SAMPLE = settings.DatasetConfig.SAMPLES_DIR / 'seizure_edf_1.edf'

SEIZURE_SAMPLE

## Read EDF file with MNE library

In [None]:
import mne

seizure_raw = mne.io.read_raw_edf(str(SEIZURE_SAMPLE), preload=True, exclude=["BURSTS",
                                                                            "ECG EKG-REF",
                                                                            "EMG-REF",
                                                                            "IBI",
                                                                            "PHOTIC-REF",
                                                                            "PULSE RATE",
                                                                            "RESP ABDOMEN-RE",
                                                                            "SUPPR"])

In [None]:
seizure_raw.close()

Resample data

In [None]:
raw_resampled_100 = seizure_raw.copy().resample(100, npad='auto')

In [None]:
raw_resampled_100.get_data()

In [None]:
seizure_raw.n_times

In [None]:
seizure_raw.times

In [None]:
seizure_raw.info['sfreq']

In [None]:
type(seizure_raw.info['meas_date'])

In [None]:
subject = seizure_raw.info['subject_info']
type(subject)

In [None]:
origin_info = seizure_raw.info
origin_info.update({'subject_info': {
                            'id': 1,
                            'his_id': 'P0002',
                            'last_name': 'Chlebus',
                            'first_name': 'Joanna',
                            'birthday': (1998, 1, 6),
                            'sex': 2,
                            'hand': 1}})

origin_info.get('device_info')

In [None]:
seizure_raw.info = origin_info

In [None]:
seizure_raw.info

In [None]:
seizure_raw.n_times

In [None]:
seizure_raw.first_samp

In [None]:
seizure_raw.last_samp

In [None]:
types = seizure_raw.get_channel_types()
types

In [None]:
seizure_raw.ch_names

In [None]:
seizure_raw.n_times

In [None]:
from matplotlib import pyplot as plt

plt.figure(figsize=(12,6))

seizure_raw.plot(block=True)

## Read EDF file with pyEDFlib library

In [None]:
import pyedflib

seizure = pyedflib.EdfReader(str(SEIZURE_SAMPLE))

In [None]:
seizure.file_info()

In [None]:
seizure.file_info_long()

In [None]:
seizure.getNSamples()

In [None]:
seizure.getSignalHeaders()

In [None]:
seizure.getSignalLabels()

In [None]:
last_chn_signal = seizure.readSignal(chn=26)

In [None]:
len(last_chn_signal)

In [None]:
last_chn_signal

## Visualize EEG with plotly

In [None]:
import plotly.graph_objects as go
import plotly.express as px

import numpy as np

In [None]:
raw_resampled_100 = seizure_raw

In [None]:
ch_names = raw_resampled_100.ch_names
n_channels = len(ch_names)
n_times = raw_resampled_100.n_times

In [None]:
data_array = raw_resampled_100.get_data()
data_array = data_array.astype(np.float16)
data_array.shape

In [None]:
n_times = 16 * 256

data_array = data_array[:, :n_times]
print(data_array.shape)

In [None]:
type(data_array[0,0])

In [None]:
step = 1. / n_channels
x_axis = [(x / 256) for x in range(0, n_times+1)]

data = []
layout = go.Layout()

for chan_nr in range(n_channels):
    domain_start = chan_nr * step
    domain_stop = (chan_nr + 1) * step
    
    kwargs = dict(domain=[domain_start, domain_stop], showticklabels=False, zeroline=False, showgrid=False)
    
    trace = go.Scattergl(x=x_axis, y=data_array[chan_nr, :], yaxis=f'y{chan_nr+1}')
    data.append(trace)
    
    layout.update({f'yaxis{chan_nr+1}': kwargs, 'showlegend': False})
    
# add channel names using Annotations
annotations = go.Annotations([go.layout.Annotation(x=-0.105, y=0, xref='paper', yref=f'y{ii+1}',
                                      text=ch_name, font=go.layout.annotation.Font(size=9), showarrow=False)
                          for ii, ch_name in enumerate(ch_names)])
layout.update(annotations=annotations)


layout.update(autosize=False, width=800, height=600)
fig = go.Figure(data=data, layout=layout)
fig.update_layout(margin=dict(l=90, r=20, t=20, b=20))

fig.update_layout(showlegend=False)
fig.update_layout(xaxis=dict(title='times [s]', rangeslider=dict(visible=True, range=[0, 16]), range=[0, 8], showgrid=False))
fig.update_xaxes(rangeslider_thickness = 0.05)

# fig.write_html("file.html")
# fig.show()

In [None]:
from typing import List

def _get_plot_img(plot_array: np.ndarray, start_time: int, sfreq: int, ch_names: List[str]):
    channel_amount, n_times = plot_array.shape
    step = 1. / channel_amount

    layout = go.Layout()

    x_axis = [(x / sfreq) + start_time for x in range(0, n_times + 1)]

    plot_traces = []
    for chan_nr in range(channel_amount):
        domain_start = chan_nr * step
        domain_stop = (chan_nr + 1) * step

        kwargs = dict(domain=[domain_start, domain_stop], showticklabels=False, zeroline=False,
                      showgrid=False)

        trace = go.Scattergl(x=x_axis, y=plot_array[chan_nr, :], yaxis=f'y{chan_nr + 1}')
        plot_traces.append(trace)

        layout.update({f'yaxis{chan_nr + 1}': kwargs, 'showlegend': False})

    # add channel names
    annotations = go.Annotations(
        [go.layout.Annotation(
            x=-0.105, y=0, xref='paper', yref=f'y{ii + 1}',
            text=ch_name, font=go.layout.annotation.Font(size=9),
            showarrow=False
        ) for ii, ch_name in enumerate(ch_names)])

    layout.update(annotations=annotations)
    layout.update(autosize=False, width=800, height=600)

    fig = go.Figure(data=plot_traces, layout=layout)
    fig.update_layout(margin=dict(l=90, r=20, t=20, b=20))

    img_bytes = fig.to_image(format='png', engine="kaleido")

    return img_bytes

In [None]:
img_bytes = _get_plot_img(plot_array=data_array, start_time=16, sfreq=256, ch_names=ch_names)

from IPython.display import Image
Image(img_bytes)

### Plotly express

In [None]:
import plotly.express as px

raw_resampled_df = raw_resampled_100.to_data_frame()
raw_resampled_df = raw_resampled_df.drop(columns=['time'])
raw_resampled_df = raw_resampled_df.reset_index()

In [None]:
raw_resampled_df

In [None]:
frame_times = 1 * 60 * 256

frame_nr_col = [(val // frame_times) for val in raw_resampled_df['index'].values]
time_col = [val % frame_times for val in raw_resampled_df['index'].values]


raw_resampled_df.insert(loc=1, column='frame_nr', value=frame_nr_col)
raw_resampled_df.insert(loc=1, column='x_time', value=time_col)

In [None]:
raw_resampled_df

In [None]:
fig = px.line(raw_resampled_df, x='x_time', y="EEG FP1-REF", animation_frame="frame_nr", render_mode='webgl')


fig["layout"].pop("updatemenus") # optional, drop animation buttons
fig.update_layout(xaxis=dict(title='times [s]', rangeslider=dict(visible=True)))

fig.show()


In [None]:
import pandas as pd

def create_df(df:pd.DataFrame):
#     df = df[df['frame_nr'] < 11]
    n_times = list(range(0, len(df))) * len(df.columns)
    x_times = list(df['x_time'].values) * len(df.columns)
    frame_nr = list(df['frame_nr'].values) * len(df.columns)
    
    signals = []
    chan_names = []
    
    chan_cols = set(df.columns) - set(['index', 'x_time', 'frame_nr'])
    
    for col in list(chan_cols):
        signals.extend(df[col].values)
        chan_names.extend([col] * len(df[col].values))
        
    return pd.DataFrame(list(zip(n_times, x_times, frame_nr, signals, chan_names)), columns=['n_times','x_times', 'frame_nr', 'signal', 'ch'])
    

In [None]:
signal_df = create_df(raw_resampled_df)

signal_df

In [None]:
signal_df.info()

In [None]:
signal_df['signal'] = signal_df['signal'].astype(np.float16)

In [None]:
signal_df.describe()

In [None]:
fig = px.line(signal_df, x="x_times", y="signal", facet_row="ch", animation_frame='frame_nr', facet_row_spacing=0.01, height=1500, render_mode='webgl')

fig["layout"].pop("updatemenus") # optional, drop animation buttons
fig.update_layout(xaxis=dict(title='times [s]', rangeslider=dict(visible=True)))
fig.update_xaxes(rangeslider_thickness = 0.02)

fig.write_html("file_all.html")
# fig.show()

In [None]:
seizure_channel_1 = list(seizure_df['EEG FP1-REF'].values)
seizure_channel_2 = list(seizure_df['EEG FP2-REF'].values)

n_times = list(range(0, 494080)) * 2


seizure_chan_names = ['EEG FP1-REF'] * len(seizure_channel_1) + ['EEG FP2-REF'] * len(seizure_channel_2)
len(seizure_chan_names)

In [None]:
seizure_channel_1.extend(seizure_channel_2)

In [None]:
import pandas as pd

two_chan_df = pd.DataFrame(list(zip(n_times, seizure_channel_1, seizure_chan_names)), columns =['n_times','Signal', 'Channel']) 
two_chan_df.reset_index(inplace=True)

In [None]:
two_chan_df