In [None]:
""""
This code import the pre-processed data in a dictionary, excluding Z1 channel as explained in the report. 
Then, using first the direct and then the inverse Fourier transform, it computes the time-domain signal corresponding to each 
frequency interval. Finally, six different signal properties are computed and saved as output.
""""

In [16]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import math
import csv
from scipy.fft import fft, fftfreq
import os
from scipy import signal
from scipy.signal import coherence
from scipy.stats import entropy
from scipy.stats import skew
from scipy.stats import kurtosis
import antropy as ant
from tqdm.notebook import tqdm

In [17]:
# Functions that interact with .csv files (reading, saving, etc)
def read_csv_files(folder_path):
    """
    Reads all the .csv files inside a folder and stores them in a dictionary. Each key corresponds to the channel names.
    
    Args:
        folder_path : path to the folder in which data are stored
        
    Returns: 
        data_dict : dictionary that contains the data
    """
    data_dict = {}  # Initializing an empty dictionary

    # read all csv file in folder_path
    filenames = sorted([f for f in os.listdir(folder_path) if f.endswith('.csv') and f != 'Z1.csv'],
    key=lambda x: int(x.split('_')[0]) if x.split('_')[0].isdigit() else float('inf'))
    
    for filename in filenames:
        # extract the initial string from file name
        initial_string = filename.split('_')[0]

        # build the complete path to file
        file_path = os.path.join(folder_path, filename)

        # read .csv file
        df = pd.read_csv(file_path, dtype=float)

        data_dict[initial_string.replace('.csv', '')] = df.values

    return data_dict

def save_dict_to_csv(dictionary, filename):
    """
    Saves a dictionary as a .csv file

    Args:
        dictionary (dict): dictionary you want to save.
        filename (str): name of the .csv file you want to create.

    Returns:
        None
    """
    try:
        with open(filename + ".csv", 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['Key', 'Value'])  # Scrive la riga dell'intestazione
            for key, value in dictionary.items():
                writer.writerow([key, value])
        print(f"Il dizionario è stato salvato con successo in {filename}.csv")
    except Exception as e:
        print(f"Errore nel salvataggio del dizionario in {filename}.csv: {e}")

In [37]:
def spectral_component(dictionary, freq_max, freq_min, Fs):
    """
    Filter the signal in order to get the component corresponding to the desired frequency interval
    
    Args:
        dictionary : broad band signal
        freq_max : upper limit of the frequency interval
        freq_min : lower limit of the frequency interval
        Fs : sampling frequency of the signal
        
    Returns: 
        dict_components : filtered signal, corresponding to the input frequecy interval, for each key (channel)
    """
    dict_components = {}
    for chiave in dictionary.keys():
        seg = dictionary[chiave].transpose()
        # Appling the hann window in order to reduce spectral leakage
        windowed_seg = seg * np.hanning(len(seg))
        # Computing the fft 
        fft_matrix = np.fft.fft(windowed_seg)
        fft_res = fft_matrix.reshape(fft_matrix.shape[1])
        # Computing the correspondant frequencies
        freq = np.fft.fftfreq(len(fft_res), 1.0/Fs)
        fft_res[np.logical_or(freq < freq_min, freq >= freq_max)] = 0 #lower value excluded, higher value included
        # Compute the inverse fft transform to get the signal in the time-domain
        dict_components[chiave] = np.fft.ifft(fft_res)
    return dict_components


def ampiezza_media_assoluta(signal):
    # Compute and return the mean of the absolute value of the signal

    return np.mean(np.abs(signal))

""""
def rms(signal):
    return np.sqrt(np.mean(signal**2))
""""

def varianza(signal):
    # Compute and returns the variance of the signal
    return np.var(signal)

def calcola_bins(data, metodo='sturges'):
    """
    Compute the appropriate number of bins according to the selected method
    
    Args:
        data : signal 
        metodo: method for thecomputation of the number of bins. Can be sturges, rice, scott, freedman-diaconis
        
    Returns: 
        bins: number of bins
    """
    n = len(data)
    if metodo == 'sturges':
        bins = int(np.ceil(np.log2(n) + 1))
    elif metodo == 'rice':
        bins = int(np.ceil(2 * n**(1/3)))
    elif metodo == 'scott':
        bins = int(np.ceil((np.max(data) - np.min(data)) / (3.5 * np.std(data) * n**(-1/3))))
    elif metodo == 'freedman-diaconis':
        iqr = np.percentile(data.real, 75) - np.percentile(data.real, 25)
        bins = int(np.ceil((np.max(data) - np.min(data)) / (2 * iqr * n**(-1/3))))
    else:
        raise ValueError("Metodo non riconosciuto")
    return bins

def calcola_entropia(signal):

    #Compute and returns the entropy of the signal

    # Compute the histogram of the signal using the previous function
    bins = calcola_bins(signal, metodo='sturges')
    hist, bin_edges = np.histogram(signal, bins=bins, density=True)
    #actual entropy computation
    return entropy(hist)


def hjorth_par(signal):
    # compute and returns mobility and complexity of the signal
    mobility, complexity = ant.hjorth_params(signal)
    return mobility, complexity


In [11]:
def compute_and_save_features(dictionary,filename):
    """
    Compute some signal features using the previously defined function and saves them in .csv file
    
    Args:
    dictionary: data of each channel
    filename : name for the file containing the features
    
    Returns: none
    """
    features_temp = {}

    for chiave in tqdm(dictionary.keys()):
        signal = np.array(dictionary[chiave].real)
        mobility, complexity = hjorth_par(signal)
        features_temp[chiave] = []  # Initializing an empty list for each key
        features_temp[chiave].append(ampiezza_media_assoluta(signal))
        features_temp[chiave].append(rms(signal))
        features_temp[chiave].append(varianza(signal))
        features_temp[chiave].append(mobility)
        features_temp[chiave].append(complexity)
        features_temp[chiave].append(calcola_entropia(signal))  
        features_temp[chiave].append(ant.num_zerocross(signal)) 

    try:
        # writing the dictionary with the features in a .csv file
        with open(filename, 'w', newline='') as csvfile:
            fieldnames = ['key','amplitude','rms', 'variance', 'mobility', 'complexity', 'entropy','num_zerocross']
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            for key, values in features_temp.items():
                writer.writerow({
                    'key': key,
                    'amplitude': values[0],
                    'rms' : values[1],
                    'variance' : values[2],
                    'mobility' : values[3],
                    'complexity': values[4],
                    'entropy': values[5],
                    'num_zerocross': values[6],
                })

        print(f"Il dizionario è stato salvato con successo in {filename}")
    except Exception as e:
        print(f"Errore nel salvataggio del dizionario in {filename}: {e}")


In [14]:
# Read .csv file and creates EPI dictionary
dizionario_EPI = read_csv_files('dati_finali_EPI')
#check the dictionary
print(dizionario_EPI.keys())
print(len(dizionario_EPI.keys()))

dict_keys(['J1', 'J2', 'J3', 'J4', 'J5', 'L1', 'L10', 'L11', 'L12', 'L13', 'L14', 'L15', 'L2', 'L3', 'L4', 'L5', 'L6', 'L7', 'L8', 'L9', 'M1', 'M2', 'M3', 'M4', 'M5', 'N1', 'N2', 'N3', 'N4', 'N5', 'N6', 'O10', 'O11', 'O13', 'O5', 'O6', 'O7', 'O8', 'O9', 'R1', 'R2', 'R3', 'X1', 'X2', 'X3', 'X4', 'Z1', 'Z2', 'Z3', 'Z4', 'Z5', 'Z6'])
52


In [9]:
# Read .csv file and creates NON EPI dictionary
dizionario_NONEPI = read_csv_files('dati_finali_NON_EPI')
#check the dictionary
print(dizionario_NONEPI.keys())
print(len(dizionario_NONEPI.keys()))

dict_keys(['A1', 'A10', 'A11', 'A12', 'A13', 'A14', 'A2', 'A3', 'A4', 'A5', 'A6', 'A7', 'A8', 'A9', 'F1', 'F10', 'F11', 'F12', 'F2', 'F3', 'F6', 'F7', 'F8', 'F9', 'G1', 'G10', 'G11', 'G12', 'G13', 'G14', 'G2', 'G3', 'G4', 'H1', 'H10', 'H11', 'H12', 'H2', 'H3', 'H4', 'H7', 'H8', 'H9', 'I1', 'I10', 'I2', 'I3', 'I7', 'I8', 'I9', 'J14', 'J15', 'J16', 'J17', 'J18', 'M15', 'M16', 'M17', 'N10', 'N11', 'N12', 'N13', 'N14', 'N15', 'N16', 'N17', 'N18', 'O1', 'O12', 'O2', 'O3', 'O4', 'Q1', 'Q2', 'Q3', 'Q4', 'Q5', 'Q6', 'Q7', 'Q8', 'Q9', 'R10', 'R11', 'R12', 'R13', 'R14', 'R15', 'R16', 'R7', 'R8', 'R9', 'T1', 'T2', 'T3', 'T4', 'X11', 'X12', 'X13', 'X14', 'X15', 'X16', 'X5', 'X6', 'Y1', 'Y10', 'Y11', 'Y2', 'Y5', 'Y6', 'Y7', 'Y8', 'Y9', 'Z10', 'Z11', 'Z12', 'Z13', 'Z14', 'Z15', 'Z16'])
119


In [28]:
# DELTA WAVES 
# filtering the siganl
dizionario_delta_EPI = spectral_component(dizionario_EPI,4,0.4,500)
dizionario_delta_NONEPI = spectral_component(dizionario_NONEPI,4, 0.4,500)
#Computing and saving the features
compute_and_save_features(dizionario_delta_EPI,'features_delta_EPI_testdata.csv')
compute_and_save_features(dizionario_delta_NONEPI,'features_delta_testdata.csv')

  0%|          | 0/51 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_delta_EPI_testdata.csv


  0%|          | 0/119 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_delta_testdata.csv


In [29]:
# THETA WAVES
# filtering the signal
dizionario_theta_EPI = spectral_component(dizionario_EPI,8,4,500)
dizionario_theta_NONEPI = spectral_component(dizionario_NONEPI,8, 4,500)
#Computing and saving the features
compute_and_save_features(dizionario_theta_EPI,'features_theta_EPI_testdata.csv')
compute_and_save_features(dizionario_theta_NONEPI,'features_theta_NONEPI_testdata.csv')

  0%|          | 0/51 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_theta_EPI_testdata.csv


  0%|          | 0/119 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_theta_NONEPI_testdata.csv


In [30]:
#ALPHA WAVES
# filtering the signal
dizionario_alpha_EPI = spectral_component(dizionario_EPI,13,8,500)
dizionario_alpha_NONEPI = spectral_component(dizionario_NONEPI,13, 8,500)
#Computing and saving the features
compute_and_save_features(dizionario_alpha_EPI,'features_alpha_EPI_testdata.csv')
compute_and_save_features(dizionario_alpha_NONEPI,'features_alpha_NONEPI_testdata.csv')

  0%|          | 0/51 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_alpha_EPI_testdata.csv


  0%|          | 0/119 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_alpha_NONEPI_testdata.csv


In [31]:
#BETA WAVES
# filtering the signal
dizionario_beta_EPI = spectral_component(dizionario_EPI,30,13,500)
dizionario_beta_NONEPI = spectral_component(dizionario_NONEPI,30, 13,500)
#Computing and saving the features
compute_and_save_features(dizionario_beta_EPI,'features_beta_EPI_testdata.csv')
compute_and_save_features(dizionario_beta_NONEPI,'features_beta_NONEPI_testdata.csv')

  0%|          | 0/51 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_beta_EPI_testdata.csv


  0%|          | 0/119 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_beta_NONEPI_testdata.csv


In [32]:
#GAMMA 1 WAVES
# filtering the signal
dizionario_gamma1_EPI = spectral_component(dizionario_EPI,50,30,500)
dizionario_gamma1_NONEPI = spectral_component(dizionario_NONEPI,50, 30,500)
#Computing and saving the features

compute_and_save_features(dizionario_gamma1_EPI,'features_gamma1_EPI_testdata.csv')
compute_and_save_features(dizionario_gamma1_NONEPI,'features_gamma1_NONEPI_testdata.csv')

  0%|          | 0/51 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_gamma1_EPI_testdata.csv


  0%|          | 0/119 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_gamma1_NONEPI_testdata.csv


In [33]:
#GAMMA 2 WAVES
# filtering the signal
dizionario_gamma2_EPI = spectral_component(dizionario_EPI,70,50,500)
dizionario_gamma2_NONEPI = spectral_component(dizionario_NONEPI,70, 50,500)
#Computing and saving the features
compute_and_save_features(dizionario_gamma2_EPI,'features_gamma2_EPI_testdata.csv')
compute_and_save_features(dizionario_gamma2_NONEPI,'features_gamma2_NONEPI_testdata.csv')

  0%|          | 0/51 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_gamma2_EPI_testdata.csv


  0%|          | 0/119 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_gamma2_NONEPI_testdata.csv


In [34]:
# GAMMA 3 WAVES
# filtering the signal
dizionario_gamma3_EPI = spectral_component(dizionario_EPI,90,70,500)
dizionario_gamma3_NONEPI = spectral_component(dizionario_NONEPI,90, 70,500)
#Computing and saving the features
compute_and_save_features(dizionario_gamma3_EPI,'features_gamma3_EPI_testdata.csv')
compute_and_save_features(dizionario_gamma3_NONEPI,'features_gamma3_NONEPI_testdata.csv')

  0%|          | 0/51 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_gamma3_EPI_testdata.csv


  0%|          | 0/119 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_gamma3_NONEPI_testdata.csv


In [15]:
#GAMMA 4 WAVES
# filtering the signal
dizionario_gamma4_EPI = spectral_component(dizionario_EPI,150,90,500)
dizionario_gamma4_NONEPI = spectral_component(dizionario_NONEPI,150, 90,500)
#Computing and saving the features
compute_and_save_features(dizionario_gamma4_EPI,'features_gamma4_EPI.csv')
compute_and_save_features(dizionario_gamma4_NONEPI,'features_gamma4_NONEPI.csv')

  0%|          | 0/52 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_gamma4_EPI.csv


  0%|          | 0/119 [00:00<?, ?it/s]

Il dizionario è stato salvato con successo in features_gamma4_NONEPI.csv
