-
Notifications
You must be signed in to change notification settings - Fork 0
/
analysis1_runregression_dims.py
152 lines (123 loc) · 5.71 KB
/
analysis1_runregression_dims.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#
# Created on Thu Jan 26 2023 by Lina Teichmann
# Contact: lina.teichmann@nih.gov
#
import numpy as np
import mne, os
import pandas as pd
import matplotlib.pyplot as plt
from joblib import Parallel, delayed
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
#*****************************#
### HELPER FUNCTIONS ###
#*****************************#
def load_epochs(bids_dir,participant,behav):
epochs = mne.read_epochs(f'{bids_dir}/derivatives/preprocessed/preprocessed_P{participant}-epo.fif',preload=True)
# THINGS-category number & image number start at 1 (matlab based) so subtracting 1 to use for indexing
epochs.metadata['things_image_nr'] = epochs.metadata['things_image_nr']-1
epochs.metadata['things_category_nr'] = epochs.metadata['things_category_nr']-1
# adding dimensional weights to metadata
epochs.metadata[['dim'+ str(d+1) for d in range(66)]] = np.nan
for i in range(len(epochs.metadata)):
if not np.isnan(epochs.metadata.loc[i,'things_image_nr']):
epochs.metadata.loc[i,['dim'+ str(d+1) for d in range(66)]] = behav[int(epochs.metadata.loc[i,'things_image_nr']),:]
return epochs
def train_test_split(epo_stacked,test_sess):
epochs_test=epo_stacked[f'session_nr == {test_sess} and trial_type == "exp"']
dim_columns = [col for col in epochs_test.metadata if col.startswith('dim')]
y_test = epochs_test.metadata[dim_columns].to_numpy()
x_test = epochs_test._data
epochs_train=epo_stacked[f'session_nr != {test_sess} and trial_type == "exp"']
dim_columns = [col for col in epochs_train.metadata if col.startswith('dim')]
y_train = epochs_train.metadata[dim_columns].to_numpy()
x_train = epochs_train._data
return x_train, x_test, y_train, y_test
def run_linear_regression(x_train,y_train,x_test,y_test,n_ys):
pipe = Pipeline([('scaler', StandardScaler()),
('regression', LinearRegression())])
pipe.fit(x_train,y_train)
predictions = pipe.predict(x_test)
return [np.corrcoef(predictions[:,d],y_test[:,d])[0,1] for d in range(n_ys)]
#*****************************#
### COMMAND LINE INPUTS ###
#*****************************#
if __name__=='__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"-participant",
required=True,
help='participant bids ID (e.g., 1)',
)
parser.add_argument(
"-bids_dir",
required=True,
help='path to bids root',
)
parser.add_argument(
"-permutation",
required=False,
help='1 if we want to permute the labels, 0 if we do not want to permute the labels',
type=int,
default = 0,
)
parser.add_argument(
"-permutationlist",
required=False,
help='list of which permutation numbers to run',
type=list,
default=[0,1,2],
)
args = parser.parse_args()
# folders
bids_dir = args.bids_dir
participant = args.participant
permutation = args.permutation
permutationlist = args.permutationlist
behav = np.loadtxt(f'{bids_dir}/sourcedata/meg_paper/predictions_66d_elastic_clip-ViT-B-32_visual_THINGS.txt')
res_dir = f'{bids_dir}/derivatives/meg_paper/output/regression/'
perm_dir = f'{bids_dir}/derivatives/meg_paper/output/regression/permutations/'
if not os.path.exists(res_dir):
os.makedirs(res_dir)
if not os.path.exists(perm_dir):
os.makedirs(perm_dir)
# parameters
n_sessions = 12
n_features = 271
n_ys = 66
n_exp_trials = 1854
n_times = 281
# load data
print('loading data participant ' + str(participant))
epochs = load_epochs(bids_dir,participant,behav)
# train-test splits for all cross-validations
x_train = np.zeros((n_exp_trials*(n_sessions-1),n_features,n_times,n_sessions))
x_test = np.zeros((n_exp_trials,n_features,n_times,n_sessions))
y_train = np.zeros((n_exp_trials*(n_sessions-1),n_ys,n_sessions))
y_test = np.zeros((n_exp_trials,n_ys,n_sessions))
for i,test_sess in enumerate(range(1,n_sessions+1)):
x_train[:,:,:,i], x_test[:,:,:,i], y_train[:,:,i], y_test[:,:,i] = train_test_split(epochs,test_sess)
print('made train and test sets for all cv-folds ')
# run regression model for each cross-validation split
corr_dims_cv = np.zeros((n_times,n_ys,n_sessions))
if permutation == 0:
for i,test_sess in enumerate(range(1,n_sessions+1)):
print('run regression cv: ' + str(test_sess))
x_train, x_test, y_train, y_test = train_test_split(epochs,test_sess)
res = Parallel(n_jobs=48, prefer="threads")(delayed(run_linear_regression)(x_train[:,:,t],y_train,x_test[:,:,t],y_test,n_ys) for t in range(len(epochs.times)))
corr_dims_cv[:,:,i] = np.array(res)
corr_dims_ts = np.mean(corr_dims_cv,axis=2)
pd.DataFrame(corr_dims_ts).to_csv(f'{res_dir}/P{participant}_linreg_within_predict-dims.csv')
else:
for perm in permutationlist:
np.random.seed(perm)
for i,test_sess in enumerate(range(1,n_sessions+1)):
print('run regression cv: ' + str(test_sess) + 'permutation ' + str(perm))
x_train, x_test, y_train, y_test = train_test_split(epochs,test_sess)
np.random.shuffle(y_train)
res = Parallel(n_jobs=48, prefer="threads")(delayed(run_linear_regression)(x_train[:,:,t],y_train,x_test[:,:,t],y_test,n_ys) for t in range(len(epochs.times)))
corr_dims_cv[:,:,i] = np.array(res)
corr_dims_ts = np.mean(corr_dims_cv,axis=2)
pd.DataFrame(corr_dims_ts).to_csv(f'{perm_dir}/P{participant}_linreg_within_perm{perm}.csv')