[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/drarkadeep/dangerously-devilish-notebooks/blob/main/nma/fMRI/M-vs-F-2nd-level.ipynb)

In [None]:
!pip install --quiet nilearn

In [2]:
import numpy as np
import pandas as pd
from nilearn.glm.first_level import make_first_level_design_matrix
from scipy import stats
import statsmodels.api as sm
from statsmodels.sandbox.stats.multicomp import multipletests
from nilearn import plotting, datasets
import os, requests
import tarfile
import ipywidgets as widgets
%matplotlib inline

fname = "hcp_task.tgz"
url = "https://osf.io/2y3fw/download"
if not os.path.isfile(fname):
  try:
    r = requests.get(url)
  except requests.ConnectionError:
    print("!!! Failed to download data !!!")
  else:
    if r.status_code != requests.codes.ok:
      print("!!! Failed to download data !!!")
    else:
      with open(fname, "wb") as fid:
        fid.write(r.content)
HCP_DIR = "./hcp_task"
if not os.path.exists(HCP_DIR):
    with tarfile.open(fname) as tfile:
      tfile.extractall('.')
fname = f"{HCP_DIR}/atlas.npz"
url = "https://osf.io/j5kuc/download"
if not os.path.isfile(fname):
  r = requests.get(url)
  with open(fname, "wb") as fid:
    fid.write(r.content)
with np.load(fname) as dobj:
  atlas = dict(**dobj)
fsaverage = datasets.fetch_surf_fsaverage()

In [164]:
def get_subject_data(subject_id, column_number):
    file_name = f"{HCP_DIR}/demo_data.csv"
    url = "https://raw.githubusercontent.com/drarkadeep/assets/main/demo_data.csv"   
    if not os.path.exists(file_name):
        response = requests.get(url)
        with open(file_name, 'wb') as f:
            f.write(response.content)
    df = pd.read_csv(file_name)
    row = df[df.iloc[:, 0] == int(subject_id)]
    return row.iloc[0, column_number]

def get_subject_gender(subject_id):
    gender = get_subject_data(subject_id, column_number=3)
    return gender

def get_subject_age(subject_id):
    age = get_subject_data(subject_id, column_number=4)
    return age

runs = ['LR','RL']
conditions = {
    'MOTOR'      : {'cond':['lf','rf','lh','rh','t','cue']},
    'WM'         : {'cond':['0bk_body','0bk_faces','0bk_places','0bk_tools','2bk_body','2bk_faces','2bk_places','2bk_tools']},
    'EMOTION'    : {'cond':['fear','neut']},
    'GAMBLING'   : {'cond':['loss','win','neut']},
    'LANGUAGE'   : {'cond':['math','story']},
    'RELATIONAL' : {'cond':['match','relation']},
    'SOCIAL'     : {'cond':['ment','rnd']}
}
regions = np.load(f"{HCP_DIR}/regions.npy").T
region_df = pd.DataFrame({'region': regions[0],
                        'network' : regions[1],
                        'hemi' : ['Right']*int(180) + ['Left']*int(180)
                        })
subjects = np.loadtxt(os.path.join(HCP_DIR, 'subjects_list.txt'), dtype='str')

subject_dict = {
    'age': {},
    'gender': {}
}

for subject_id in subjects:
    age = get_subject_age(subject_id)
    gender = get_subject_gender(subject_id)
    
    if age not in subject_dict['age']:
        subject_dict['age'][age] = []
    subject_dict['age'][age].append(subject_id)
    
    if gender not in subject_dict['gender']:
        subject_dict['gender'][gender] = []
    subject_dict['gender'][gender].append(subject_id)


In [97]:
def load_single_timeseries(subject, experiment, run):
  bold_run  = runs[run]
  bold_path = f"{HCP_DIR}/subjects/{subject}/{experiment}/tfMRI_{experiment}_{bold_run}"
  bold_file = "data.npy"
  ts = np.load(f"{bold_path}/{bold_file}")
  ts -= ts.mean(axis=1, keepdims=True)
  ts /= ts.std(axis=1, keepdims=True)
  return ts.T


def get_events(subject, experiment, run):
    task_key = f'tfMRI_{experiment}_{runs[run]}'
    events_data = []
    for cond in conditions[experiment]['cond']:
        ev_file = f"{HCP_DIR}/subjects/{subject}/{experiment}/{task_key}/EVs/{cond}_event.txt"
        ev_array = np.loadtxt(ev_file, ndmin=2, unpack=True)
        df = pd.DataFrame({
            'onset': ev_array[0],
            'duration': ev_array[1],
            'trial_type': cond
        })
        events_data.append(df)
    events = pd.concat(events_data, ignore_index=True)
    events = events.sort_values('onset')    
    return events
    

def create_design_matrix(subject, experiment, run, TR, n_scans):
    events = get_events(subject, experiment, run)
    frame_times = np.arange(n_scans) * TR    
    design_matrix = make_first_level_design_matrix(
        frame_times, 
        events,
        hrf_model='spm + derivative',
        drift_model='cosine',
        high_pass=0.01 
    )    
    return design_matrix

def run_first_level_glm(subject, experiment, run, TR, correction):
    bold_data = load_single_timeseries(subject, experiment, run)
    n_rois = bold_data.shape[1]
    n_scans = bold_data.shape[0]
    design_matrix = create_design_matrix(subject, experiment, run, TR, n_scans)
    
    results = []
    
    for roi in range(n_rois):
        model = sm.OLS(bold_data[:, roi], design_matrix).fit()
        corrected_p_values = multipletests(np.array(model.pvalues).flatten(), alpha=0.05, method=correction)[1]        
        results.append({
            'ROI': roi,
            'coefficients': np.array(model.params),
            'p_values': np.array(model.pvalues),
            'corrected_p_values': corrected_p_values,
            't_values': np.array(model.tvalues),
            'rsquared': model.rsquared
        })
    return pd.DataFrame(results), design_matrix.columns

def prepare_second_level_data(all_subjects_results, regressor_index):
    group_data = []
    for subject_results in all_subjects_results:
        group_data.append(subject_results['coefficients'].apply(lambda x: x[regressor_index]))
    return np.array(group_data)

def run_second_level_glm(group_data, second_level_correction):
    n_rois = group_data.shape[1]
    results = []    
    for roi in range(n_rois):
        t_stat, p_value = stats.ttest_1samp(group_data[:, roi], 0)     
        results.append({
            'ROI': roi,
            't_statistic': t_stat,
            'p_value': p_value
        })    
    results_df = pd.DataFrame(results)    
    corrected_p_values = multipletests(results_df['p_value'], alpha=0.05, method=second_level_correction)[1]
    results_df['corrected_p_value'] = corrected_p_values    
    return results_df

def glm_for_cohort(subject_cohort):
    first_level_results = []
    second_level_results = {}
    
    for subject in subject_cohort:
        for run in range(2):
            subject_results, columns = run_first_level_glm(subject, experiment, run, TR, first_level_correction)
            first_level_results.append(subject_results)
            
    n_regressors = len(first_level_results[0]['coefficients'][0])
    for regressor_index in range(n_regressors):
        group_data = prepare_second_level_data(first_level_results, regressor_index)
        second_level_results[columns[regressor_index]] = run_second_level_glm(group_data, second_level_correction)
    
    return second_level_results

TR = 0.72
experiment = "GAMBLING"
first_level_correction = "fdr_bh"
second_level_correction = "fdr_bh"
male_subjects = subject_dict["gender"]["M"]
female_subjects = subject_dict["gender"]["F"][:len(male_subjects)]

male_results = glm_for_cohort(male_subjects)
female_results = glm_for_cohort(female_subjects)

In [177]:
def show_top_ROIs(regressor, gender, count):
    if gender == "Male":
        result = male_results[regressor].sort_values("t_statistic", ascending=False)
    else:
        result = female_results[regressor].sort_values("t_statistic", ascending=False)
    result= result.iloc[:,:2]
    result["region"] = result["ROI"].map(region_df["region"])
    result["network"] = result["ROI"].map(region_df["network"])
    result["hemi"] = result["ROI"].map(region_df["hemi"]) 
    print("TOP NETWORKS\n")
    top_regions = result.groupby('network')['t_statistic'].mean().reset_index()
    print(top_regions.sort_values(by='t_statistic', ascending=False).reset_index().iloc[:,1:])
    print("\nTOP ROIs\n")
    print(result.head(count).reset_index())
        
top_ROIs = widgets.interact(show_top_ROIs, regressor = widgets.Dropdown(options=['loss', 'loss_derivative', 'neut', 'neut_derivative', 'win', 'win_derivative', 'drift_1', 'drift_2', 'drift_3', 'constant'], value='loss', description='Regressor:'), gender = widgets.RadioButtons(options=["Male", "Female"], value='Male', description='Gender:'), count=widgets.IntSlider(min=0, max=50, value=20, step=5, description='Top ROIs #:'))   

interactive(children=(Dropdown(description='Regressor:', options=('loss', 'loss_derivative', 'neut', 'neut_der…

In [162]:
def plot_brain(regressor, plot, vmax, threshold, gender):
    if gender == "Male":
        surf_contrast = np.array(male_results[regressor]["t_statistic"])[atlas["labels_L"]]
    else:
        surf_contrast = np.array(female_results[regressor]["t_statistic"])[atlas["labels_L"]]
    brain_view = plotting.view_surf(fsaverage[f'{plot}_left'],
                   surf_contrast, cmap="magma",
                   vmax=vmax, threshold=threshold)
    return brain_view

brain_plot = widgets.interact(plot_brain, regressor=widgets.Dropdown(options=['loss', 'loss_derivative', 'neut', 'neut_derivative', 'win', 'win_derivative', 'drift_1', 'drift_2', 'drift_3', 'constant'], value='loss', description='Regressor:'), plot=widgets.Dropdown(options=['pial', 'infl', 'sphere'], value='infl', description='Plot Type:'), vmax=widgets.FloatSlider(min=0, max=10, value=7, step=0.1, description='VMax:'), threshold=widgets.IntSlider(min=0, max=5, value=0, description='Threshold:'), gender = widgets.RadioButtons(options=["Male", "Female"], value='Male', description='Gender:'))


interactive(children=(Dropdown(description='Regressor:', options=('loss', 'loss_derivative', 'neut', 'neut_der…

In [None]:
# Code used to generate the plots for proposal
# import matplotlib.pyplot as plt

# # Load the fsaverage surface mesh
# fsaverage = datasets.fetch_surf_fsaverage()

# # Function to plot brain surfaces for given conditions
# def blot_brain(regressor, gender):
#     if gender == "Male":
#         surf_contrast = np.array(male_results[regressor]["t_statistic"])[atlas["labels_L"]]
#     else:
#         surf_contrast = np.array(female_results[regressor]["t_statistic"])[atlas["labels_L"]]
    
#     # Parameters
#     cmap = "magma"
#     vmax = 7
#     threshold = 0

#     # List of views: anterior, posterior, medial, lateral, dorsal, ventral
#     views = ['anterior', 'posterior', 'medial', 'lateral', 'dorsal', 'ventral']

#     # Loop through the views and save each one as a PNG file
#     for view in views:
#         plt.figure()
#         plotting.plot_surf(fsaverage['infl_left'],  # Using left hemisphere as default
#                            surf_contrast,
#                            hemi='left',  # You can change to 'right' for the right hemisphere
#                            view=view,
#                            cmap=cmap,
#                            vmax=vmax,
#                            threshold=threshold,
#                            colorbar=True)
#         plt.title(f'{gender} - {regressor.capitalize()} - {view.capitalize()} view', fontsize=10)
        
#         # Save each view as a separate PNG file
#         filename = f'{gender.lower()}_{regressor}_{view}.png'
#         plt.savefig(filename, dpi=300, bbox_inches='tight')
#         plt.close()

# # Generate plots for "loss" and "win" for both genders
# conditions = ['loss', 'win']
# genders = ['Male', 'Female']

# for condition in conditions:
#     for gender in genders:
#         blot_brain(condition, gender)

# !zip -r download.zip *.png
