In [1]:
import importlib
import numpy as np
import pandas as pd
import os
import sys


neuroginius_path = '/homes_unix/agillig/neuroginius'
sys.path.insert(1, neuroginius_path)

import neuroginius

importlib.reload(neuroginius)

from neuroginius import derivatives as dt
importlib.reload(dt)

sys.path.insert(1, '/homes_unix/agillig/projects/dynapred/code')
import get_share_data as share
importlib.reload(share)
sys.path.remove('/homes_unix/agillig/projects/dynapred/code')


from neuroginius.atlas import Atlas
from neuroginius import data
importlib.reload(data)

  from .autonotebook import tqdm as notebook_tqdm


<module 'neuroginius.data' from '/homes_unix/agillig/neuroginius/neuroginius/data.py'>

In [2]:
y = share.get_share_data(retrieve_timeseries=False)
print(f'n subjects (cog + emo): {len(y)}')
cog_vars = y.columns[y.columns.str.contains('GER_')].tolist()
y_ger = share.get_share_data(cognitive_var=cog_vars, retrieve_timeseries=False)
print(f'n subjects (emo): {len(y_ger)}')

n_intersection = len(set(y.index) & set(y_ger.index))
print(f'n subjects in intersection: {n_intersection}')
if n_intersection == len(y):
    print('---> all subjects in y are also in y_ger')

n subjects (cog + emo): 742
n subjects (emo): 1369
n subjects in intersection: 742
---> all subjects in y are also in y_ger


In [3]:
atlas = Atlas.from_name('schaefer200')

rsfiles = pd.read_csv('/homes_unix/agillig/projects/dynapred/processing/MRiShare_gsreg_list.txt', header=None)
prefix = 'SHARE'
rsfiles.index = rsfiles[0].str.split(prefix, expand=True)[1].str.split('/', expand=True).apply(lambda x: prefix + x[0], axis=1).values

extraction_method = 'multivariate'
parcellated_ts = dt.ParcellatedTimeseries(atlas,
    extraction_method=extraction_method,
    derivatives_path = '/projects/resting/ishare_derivatives_ag'
)

In [4]:
n_subjects = len(rsfiles.values.flatten().tolist())

In [5]:
metric = 'mdcor'
mdcor = dt.PairwiseInteraction(
    metric=metric, 
    atlas=atlas, 
    extraction_method=parcellated_ts.extraction_method)

mdcor.set_derivatives_path(parcellated_ts.derivatives_path)
mdcor.subjects_from_prefix("SHARE")

In [6]:
ts_subjects = parcellated_ts.subjects_from_prefix("SHARE", return_values=True)

In [7]:
ts = [np.random.random((20, 1049)) for _ in range(100)]

In [8]:
atlas.name

'schaefer200'

In [9]:
derivatives_dir = '/projects/resting/ishare_derivatives_ag'

In [10]:
def save(X, sub_id):
    savename = f'{derivatives_dir}/pairwise_interactions/{atlas.name}/{extraction_method}/{metric}/{sub_id}_{atlas.name}_{extraction_method}_{metric}.csv'
    os.makedirs(os.path.dirname(savename), exist_ok=True)
    np.savetxt(savename, X, delimiter=',')

In [11]:
for id in parcellated_ts.subjects:
    if not parcellated_ts.exists(index=id):
        print(f"Missing {id}")

In [12]:
rsfiles.loc[rsfiles.index[0]].values.tolist()

['/data/analyses/work_in_progress/repos/rs_processing/gs_regression/_subject_id_SHARE0001/bpEpiStd_gsreg/out_res_gsRegression.nii.gz']

In [13]:
def process():
    results = []
    for subid, file in rsfiles.iterrows():
        results.append(file.values.tolist())
    return results
        
        



In [14]:
data = []
if parcellated_ts.exists(n_subjects=n_subjects):
    for i in range(10):
        tmp_data = parcellated_ts.load_individual(i)
        data.append(mdcor.fit_transform(tmp_data))

data = np.array(data)

In [15]:
mdcor.set_derivatives_path('/projects/resting/ishare_derivatives_ag/')

In [16]:
mdcor.exists(n_subjects)


False

In [17]:
# mdcor.fit_individual(parcellated_ts, 0)

In [18]:
#connect to slurm
from dask_jobqueue import SLURMCluster
from dask.distributed import LocalCluster, Client
import os
cluster = SLURMCluster(cores=4,
                      #  processes=8,
                       memory="600GB",
                       job_mem='64GB',
                      #  account="agillig",
                       walltime="03:00:00",
                       queue="normal",
                       job_extra_directives=[
                           "--partition=normal,highmem",
                      f"--export=ALL,PYTHONPATH={neuroginius_path}:$PYTHONPATH"                       
                      ]
                       )


client = cluster.get_client()
cluster.scale(jobs=30)

In [19]:
# client.shutdown()

In [20]:
128 * 7

896

In [21]:
print(client.dashboard_link)

http://10.85.1.77:8787/status


In [22]:
from dask import delayed
import dask.array as da

In [25]:
def load_nii(file):
    from nilearn.image import load_img
    return load_img(file)

def save(X, sub_id):
    import os
    import numpy as np
    savename = f'{derivatives_dir}/pairwise_interactions/{atlas.name}/{extraction_method}/{metric}/{sub_id}_{atlas.name}_{extraction_method}_{metric}.csv'
    os.makedirs(os.path.dirname(savename), exist_ok=True)
    np.savetxt(savename, X, delimiter=',')

In [44]:
img = load_img(atlas.maps)

In [47]:
len(img.get_fdata().shape)

3

In [28]:
def process(file):
    ld = client.submit(load_nii, file)
    return ld

res = process(rsfiles.loc[rsfiles.index[0]].values.tolist())

In [39]:
res.status

'cancelled'

In [35]:
res.result()

<nibabel.nifti1.Nifti1Image at 0x7ff2face48c0>

In [225]:
def process():
    results = []
    for subid, file in rsfiles.iterrows():
        file = file.values.tolist()[0]
        if subid not in y_ger.index:
            continue
        if mdcor.exists(index=subid):
            continue
        ts = delayed(parcellated_ts.fit_transform)(file)
        conn = delayed(mdcor.fit_transform)(ts)
        saved = delayed(save)(conn, subid)
        results.append(saved)
    return results

In [226]:
results = da.compute(*process())

KeyboardInterrupt: 

In [39]:
import dask
dask.config.refresh()

In [12]:
def vectorize_matrix(matrix):
    return matrix[np.triu_indices(matrix.shape[0], k=1)]

def save(np_array, path):
    np.savetxt(path, np_array)

In [13]:
save_dir = f'/projects/resting/ishare_derivatives_ag/pairwise_interactions/{metric}'
os.makedirs(save_dir, exist_ok=True)