In [1]:
# Decoding script, takes a GroupData set and uses Linear Discriminant Analysis
# to decode trial conditions or word tokens from neural data

import numpy as np
import os
import pandas as pd

# import matplotlib
# matplotlib.use('notebook')
import sys; print('Python %s on %s' % (sys.version, sys.platform))
sys.path.extend(['/Users/yuchaowang/Documents/git/SentenceRep_analysis'])
sys.path.extend(['/Users/yuchaowang/Documents/git/IEEG_Pipelines'])
from analysis.grouping import GroupData
from analysis.decoding import plot_all_scores
from analysis.decoding.words import score, dict_to_structured_array
box = os.path.expanduser(os.path.join("~", "Library", "CloudStorage", "Box-Box"))
fpath = os.path.join(box, "CoganLab")
subjects_dir = os.path.join(box, "ECoG_Recon")

Python 3.12.5 | packaged by Anaconda, Inc. | (main, Sep 12 2024, 13:22:57) [Clang 14.0.6 ] on darwin






In [2]:
# %% Imports for total GroupData

sub = GroupData.from_intermediates(
    "SentenceRep", fpath, folder='stats', subjects_dir=subjects_dir)
all_data = []
colors = [[0, 1, 0], [1, 0, 0], [0, 0, 1], [0.5, 0.5, 0.5]]
scores = {'Auditory': None, 'Sensory-Motor': None, 'Production': None, 'All': None}
scores2 = {'Auditory': None, 'Sensory-Motor': None, 'Production': None, 'All': None}
idxs = [sub.AUD, sub.SM, sub.PROD, sub.sig_chans]
idxs = [list(idx) for idx in idxs]
names = list(scores.keys())
conds = [['aud_ls', 'aud_lm'], ['go_ls', 'go_lm'], 'resp']
window_kwargs = {'window': 20, 'obs_axs': 1, 'normalize': 'true', 'n_jobs': -2,
                'average_repetitions': False}

Loading significance: 100%|██████████| 34/34 [00:01<00:00, 17.26it/s]
Loading power: 100%|██████████| 34/34 [00:10<00:00,  3.34it/s]
Loading zscore: 100%|██████████| 34/34 [00:10<00:00,  3.34it/s]
Loading pval: 100%|██████████| 34/34 [00:01<00:00, 17.49it/s]


In [3]:
import pickle

sub_channels = sub.keys.get('channel',[])
with open('sub_channel.pkl', 'wb') as f:
    pickle.dump(sub_channels, f)


def sep_sub_ch(sub_ch:str):
    """
    Input: subject_channel string 
    Output: separates into subject number and channel
    """
    sub = sub_ch.split('-')[0]
    sub_num = sub.lstrip('D0')
    ch = sub_ch.split('-')[1]
    return sub_num, ch

def regen_ch_label(sub_ch: str, sub_dir: str):
    """
    Input: full subj_channel name 
    Output: ROI label using Brainnectome atlas with radius 10mm
    """
    from ieeg.viz import mri
    sub_num, ch = sep_sub_ch(sub_ch)
    info = mri.subject_to_info(f"D{sub_num}", sub_dir)
    labels = mri.gen_labels(info, sub=f"D{sub_num}", subj_dir=sub_dir,
                    atlas=".BN_atlas") 
    ch_label = labels[ch]
    return ch_label

def get_ch_label(channels, sub_dir): 
    """
    Input: list of channels 
    Output: list of atlas labels 
    """
    channel_label = []
    for sub_ch in channels: 
        channel_label.append(regen_ch_label(sub_ch, sub_dir))
    return channel_label


channel_label = get_ch_label(sub_channels, subjects_dir) 

with open('channel_label.pkl', 'wb') as f:
    pickle.dump(channel_label, f)

#ch_roi = regen_ch_label('D0102-RTPI5', subjects_dir)

#print(sub_channels.find('D0102-RTPI5'))

In [3]:
import pickle

with open('sub_channel.pkl', 'rb') as f:
    sub_channels = pickle.load(f)
with open('channel_label.pkl', 'rb') as f:
    channel_label = pickle.load(f)



idxs_hipp = [index for index, item in enumerate(channel_label) if 'Hipp' in item]

idxs_thal = [index for index, item in enumerate(channel_label) if 'tha' in item]




In [4]:

idxs_hippandsig = list(set(sub.sig_chans) & set(idxs_hipp))
#sub.plot_groups_on_average([idxs_hippandsig])
#sub.plot_groups_on_average([idxs_thal])


In [None]:
sub.plot_groups_on_average([range(3792)])

In [None]:
# List of Sentence Rep channels sent to Kumar for hippocampal index
#channels = pd.DataFrame(sub.keys.get('channels'))
#channels.to_csv('sentencerep_channels.csv', index=True, header=True)


In [None]:
# %% Time Sliding decoding for word tokens
scores = score({'heat': 1, 'hoot': 2, 'hot': 3, 'hut': 4}, 0.8, 'lda', 5, 10, sub, [idxs_hipp], conds,
                            window_kwargs, scores,
                            shuffle=False)
dict_to_structured_array(scores, 'true_scores_hipp.npy')
out_scores = scores.copy()
# scores2 = score({'heat': 1, 'hoot': 2, 'hot': 3, 'hut': 4},
#                                 0.8, 'lda', 5, 250, sub, idxs, conds,
#                                 window_kwargs, scores2,
#                                 shuffle=True)
# dict_to_structured_array(scores2, 'shuffle_score.npy')

In [None]:
# %% Plotting
data_dir = ''

scores = np.load(data_dir + 'true_scores_hipp.npy', allow_pickle=True)[0]
scores = {name: scores[name] for name in scores.dtype.names}


# plots = {}
# for key, values in scores2.items():
#     if values is None:
#         continue
#     plots[key] = np.mean(values.T[np.eye(4).astype(bool)].T, axis=2)
# fig, axs = plot_all_scores(plots, conds, {n: i for n, i in zip(names, idxs_hipp)}, colors, "Word Decoding")

# for ax in fig.axes:
#     ax.axhline(0.25, color='k', linestyle='--')

plots = {}
for key, values in scores.items():
    if values is None:
        continue
    plots[key] = np.mean(values.T[np.eye(4).astype(bool)].T, axis=2)
fig, axs = plot_all_scores(plots, conds, {n: i for n, i in zip(names, idxs)}, colors, "Word Decoding")

for ax in fig.axes:
    ax.axhline(0.25, color='k', linestyle='--')

# # %% Time Sliding decoding significance

# shuffle_score = np.load(data_dir + 'shuffle_score_short.npy', allow_pickle=True)[0]
# shuffle_score = {name: shuffle_score[name] for name in shuffle_score.dtype.names}
# signif = {}
# for cond, score in scores.items():
#     true = np.mean(score.T[np.eye(4).astype(bool)].T, axis=2)
#     shuffle = np.mean(shuffle_score[cond].T[np.eye(4).astype(bool)].T, axis=2)
#     signif[cond] = time_perm_cluster(true.T, shuffle.T, 0.001, stat_func=lambda x, y, axis: np.mean(x, axis=axis))

# %% Plot significance
# for cond, ax in zip(conds, axs):
#     bars = []
#     if isinstance(cond, list):
#         cond = "-".join(cond)
#     for i, idx in enumerate(idxs):
#         name = "-".join([names[i], cond])
#         if name.endswith('resp'):
#             times = (-1, 1)
#         else:
#             times = (-0.5, 1.5)
#         shuffle = np.mean(shuffle_score[name].T[np.eye(4).astype(bool)].T, axis=2)
#         # smooth the shuffle using a window
#         window = np.lib.stride_tricks.sliding_window_view(shuffle, 20, 0)
#         shuffle = np.mean(window, axis=-1)
#         plot_dist_bound(shuffle, 'std', 'both', times, 0, ax=ax, color=colors[i], alpha=0.3)
#         bars.append(signif[name])
#     plot_horizontal_bars(ax, bars, 0.05, 'below')


fig.show()


In [None]:
from ieeg.viz.ensemble import plot_dist
data = sub.array['zscore','aud_ls'].combine((0, 2))
plot_dist(data.__array__())

In [None]:
class Person:
    def __init__(self, name, age):
        self.__name = name  # Private attribute
        self.__age = age    # Private attribute

    @property
    def get_name(self):
        return self.__name  # Getter method

    def set_name(self, name):
        self.__name = name  # Setter method

    def get_age(self):
        return self.__age  # Getter method

    def set_age(self, age):
        if age > 0:
            self.__age = age  # Setter method with validation

person = Person("Alice", 30)
print(person.get_age())  # Alice

