In [None]:
import pandas as pd

from scipy import stats
from scipy.stats import mannwhitneyu as mw
from scipy.stats import sem
from scipy.stats import iqr
from scipy.stats import ttest_ind
from scipy.stats import ttest_rel
from scipy.stats import wilcoxon

import random

import math

import numpy as np

import time

import os
from os import listdir
from os.path import isfile, join

import neo

import matplotlib.pyplot as plt
import seaborn as sns

from spike_filter import apply_intervals, get_intervals, get_spiketrains
from utility import spiketrains_iterator, events_iterator
import nexfile

In [None]:
df_dict = {
    "patient": [],
    "doc_name": [],
    "data_name": [],
    "interval_name": [],
    "spt_list": [],
    "isi_list": [],
}

pat_path = r"C:/Users/Nikita/Desktop/Лаба/Entropy/CD_GD_PD_2024/GPi/"

ISI_total = []
ISI_change_total = []
ISI_rel_change_total = []


spike_c = 0

# patients ID list
pat_list = listdir(pat_path)
# print(pat_list[0])
r = nexfile.Reader()
# patients processing
for patient in pat_list:
    print("Patient processing:", patient)

    temp_c = 0

    start = time.time()
    folder_path = pat_path + patient
    onlyfiles = [f for f in listdir(folder_path) if isfile(join(folder_path, f))]
    for f_name in onlyfiles:
        if f_name.endswith(".nex"):  # More Pythonic way to check file extension
            full_name = os.path.join(
                folder_path, f_name
            )  # Use os.path.join for better path handling
            try:
                file_data = r.ReadNexFile(filePath=full_name)
            except Exception as e:  # Catch specific exceptions if possible
                print(f"Error reading {f_name}: {e}")
            spiketrains = list(get_spiketrains(file_data))
            intervals = list(get_intervals(file_data))
            for spiketrain_name, interval_name, spikes in apply_intervals(
                spiketrains, intervals
            ):
                spikes = np.unique(spikes)
                spikes = spikes - spikes[0]
                isi = np.diff(spikes)
                df_dict["patient"].append(patient)
                df_dict["doc_name"].append(f_name)
                df_dict["data_name"].append(spiketrain_name)
                df_dict["interval_name"].append(interval_name)
                df_dict["spt_list"].append(list(spikes))
                df_dict["isi_list"].append(list(isi))
                spike_c += 1
                temp_c += 1
    print("Patient's spiketrain count:", temp_c)
pd.DataFrame(df_dict).to_csv("spt_isi_GPi.csv")
print("Patient processing is over")
print("Total spiketrains count:", spike_c)


In [None]:
# Read the CSV file
df = pd.read_csv("spt_isi_GPi.csv")


def toFixed(numObj, digits=0):
    return round(numObj, digits)


def isitm(isi_list):
    mean_isi = np.mean(isi_list)
    return [1 if el >= mean_isi else 0 for el in isi_list]


def isitisi(isi_list, dig=4):
    isi_list_fixed = [toFixed(float(i), dig) for i in isi_list]
    return [
        3
        if isi_list_fixed[i + 1] > isi_list_fixed[i]
        else 2
        if isi_list_fixed[i + 1] == isi_list_fixed[i]
        else 1
        for i in range(len(isi_list_fixed) - 1)
    ]


isitm_res = []
isitisi_res = []

list_to_drop = []

for i in range(len(df)):
    print("Нейрон №", i + 1)
    try:
        # Convert isi_list from string to float list
        isi_list = [float(j) for j in df["isi_list"][i].strip("][").split(", ")]

        isitm_res.append(isitm(isi_list))
        isitisi_res.append(isitisi(isi_list))
    except ValueError as e:
        print(f"Error processing row {i}: {e}")
        list_to_drop.append(i)

# Drop rows with errors and reset index
df.drop(index=list_to_drop, inplace=True)
df.reset_index(drop=True, inplace=True)

# Add results to DataFrame
df["isitm"] = isitm_res
df["isitisi"] = isitisi_res

# Save to CSV
df.to_csv("encoded_isi_GPi.csv", index=False)


In [None]:
df = pd.read_csv(
    "encoded_isi_GPi.csv", index_col=0
)  # open file created with 2 CSV to ENCODED
# Function for counting number of subsets in list


def counter_1(base, find):
    max_base = len(base) - len(find) + 1
    result = 0
    for i in range(max_base):
        yes = True
        for j in range(len(find)):
            if base[i + j] != find[j]:
                yes = False
                break
        if yes:
            result += 1
    return result


def CAE_entropy(isi_change_list, word_len=2):
    N = len(isi_change_list) - word_len + 1

    rel_freq = {}  # Здесь хранится вероятность встретить каждый символ
    counts_list = []
    for i in range(0, N):
        word = tuple(isi_change_list[i : i + word_len])
        if word in rel_freq.keys():
            pass
        else:
            counts = counter_1(isi_change_list, word)
            counts_list.append(counts)
            rel_freq[word] = counts / N
    counts_list = np.array(counts_list)
    counts_list = counts_list[counts_list > 0]
    n = np.sum(counts_list)
    p = counts_list / n

    f1 = np.count_nonzero(counts_list == 1)
    if f1 == n:
        f1 = n - 1

    C = 1 - f1 / n
    pa = C * p
    la = 1 - (1 - pa) ** n

    return -np.sum(pa * np.log2(pa) / la), -np.sum(p * np.log2(p))  # CAE, ENT


# Conditional entropy calculation


def cond_ent(isi_list, samp_isi_list, word_len=2):
    N = len(isi_list) - word_len + 1
    M = len(samp_isi_list) - word_len + 1
    if M != N:
        return print("ERROR")
    r_prob = {}  # Здесь хранится вероятность встретить каждый символ
    trial = {}
    s_prob = {}
    trial_list = []
    for i in range(0, N):
        word_r = tuple(isi_list[i : i + word_len])
        word_s = tuple(samp_isi_list[i : i + word_len])
        if word_r in r_prob.keys():
            pass
        else:
            counts = counter_1(isi_list, word_r)
            r_prob[word_r] = counts / N
        if word_s in trial.keys():
            trial[word_s].append(word_r)
        else:
            trial[word_s] = []
            trial[word_s].append(word_r)
            counts = counter_1(samp_isi_list, word_s)
            s_prob[word_s] = counts / M
    h_list = []
    for word_s in s_prob.keys():
        unique = set(trial[word_s])
        for el in unique:
            cond_prob = trial[word_s].count(el) / len(trial[word_s])
            h_list.append(-s_prob[word_s] * cond_prob * np.log2(cond_prob))
    return np.sum(h_list)


params = ["isitm", "isitisi"]
df = df[df["isitisi"] != "[]"]
df = df.reset_index(drop=True)

ent_dict = {}
low = 2
top = 2
for word_length in range(low, top + 1):
    for param in params:
        list_to_drop = []
        print(param)
        ent_param_name = "ent_" + param + "_wl" + str(word_length)
        cond_ent_param_name = "cond_ent_" + param + "_wl" + str(word_length)
        shuf_ent_param_name = "shuf_ent_" + param + "_wl" + str(word_length)

        ent_dict[ent_param_name] = []
        ent_dict[cond_ent_param_name] = []
        ent_dict[shuf_ent_param_name] = []

        for i in range(len(df)):
            try:
                data = df[param][i].strip("][").split(", ")
                data = [float(j) for j in data]
            except:
                list_to_drop.append(i)
                pass
            res = CAE_entropy(data, word_len=word_length)

            ent_dict[ent_param_name].append(res[1])

            temp_res_cond_ent = []
            temp_res_ent = []

            for m in range(100):
                samp_data = random.sample(data, k=len(data))

                # calculating conditional entropy with shuffled signal
                temp_res_cond = cond_ent(data, samp_data, word_len=word_length)
                temp_res_cond_ent.append(temp_res_cond)

                # calculation entropy of shuffled signal
                temp_res = CAE_entropy(samp_data, word_len=word_length)
                temp_res_ent.append(temp_res[1])

            ent_dict[cond_ent_param_name].append(np.median(temp_res_cond_ent))
            ent_dict[shuf_ent_param_name].append(np.median(temp_res_ent))
    df = df.drop(columns=["spt_list", "isi_list", "isitm", "isitisi"])
    for key in ent_dict.keys():
        df[key] = ent_dict[key]
    df.to_csv("encoded_data_with_ents_GPi_wl" + str(word_length) + ".csv")


In [None]:
raw_data = pd.read_excel("GPi.xls")

params = ["isitm", "isitisi"]

files_list = []
max_wl = 2
for i in range(2, max_wl + 1):
    files_list.append(
        "encoded_data_with_ents_GPi_wl" + str(i) + ".csv"
    )  # open files created with 3 ENCODED ISI

df = pd.read_csv(files_list[0])

result = pd.merge(
    df, raw_data, on=["patient", "doc_name", "data_name", "interval_name"]
)

for i in range(2, 3):
    for el in ["isitisi", "isitm"]:
        MI = "mut_inf_" + el + "_wl" + str(i)  # mutual information I(X;Y)
        X_ent = "ent_" + el + "_wl" + str(i)
        cond_XY = "cond_ent_" + el + "_wl" + str(i)
        result[MI] = result[X_ent] - result[cond_XY]


result.to_csv("file_to_analyse_GPi.csv")
result.to_excel("file_to_analyse_GPi.xlsx")


In [None]:
result