# Separação de janelas de tempo

In [35]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = (SparkSession.builder
                     .appName('Doutorado')
                     .getOrCreate())

path_data = '/media/davi/6A81-05CF/physionet.org/files/siena-scalp-eeg/1.0.0/PN00/'
infos_path = '/home/davi/Documentos/doutorado_ppgee_v2/data/siena_infos.json'

infos = pd.read_json(infos_path)


## Separação ECG-EEG

In [5]:
from os import walk

files = [dir_[0] for dir_ in walk(path_data)][1:]


### Separando período Ictal

In [7]:
def pipeline_ictal(file: str):
    df = spark.read.parquet(file)

    eeg_ch = [col for col in df.schema.names if 'EEG' in col]
    eeg_ch.append('label')
    eeg_ch.append('__index_level_0__')

    ecg_ch = [col for col in df.schema.names if 'EKG' in col]
    ecg_ch.append('label')
    ecg_ch.append('__index_level_0__')

    (df.select(ecg_ch)
       .filter(col("label") == 'I')
       .write
       .mode("overwrite")
       .parquet(f"{file}/ICTAL/EKG"))

    (df.select(eeg_ch)
       .filter(col("label") == 'I')
       .write
       .mode("overwrite")
       .parquet(f"{file}/ICTAL/EEG"))


In [8]:
for file in files:
    pipeline_ictal(file)


22/03/01 13:29:19 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

### Separando período Não-Ictal

In [51]:
def pipeline_N_ictal(file: str, info: dict):
    freq = info['sfreq']
    start = info['seizure_start_sec']*freq
    stop = info['seizure_end_sec']*freq
    total = info['total_seizure']*freq

    fim = int(start - (20*freq))
    inicio = int(fim - total)

    df = spark.read.parquet(file)

    eeg_ch = [col for col in df.schema.names if 'EEG' in col]
    eeg_ch.append('label')
    eeg_ch.append('__index_level_0__')

    ecg_ch = [col for col in df.schema.names if 'EKG' in col]
    ecg_ch.append('label')
    ecg_ch.append('__index_level_0__')

    (df.select(ecg_ch)
     .filter(col('__index_level_0__')
             .between(inicio, fim))
     .write
     .mode("overwrite")
     .parquet(f"{file}/N_ICTAL/EKG"))

    (df.select(eeg_ch)
        .filter(col('__index_level_0__')
                .between(inicio, fim))
        .write
        .mode("overwrite")
        .parquet(f"{file}/N_ICTAL/EEG"))


In [56]:
# for file in files:
#     pipeline_ictal(file)

for index, row in infos.iterrows():
    info = dict(row)
    file = f"{path_data}{info['name'].replace('.edf','')}"
    pipeline_N_ictal(file=file, info=info)

                                                                                