# Analyze Finger Tapping Data

In [214]:
%matplotlib inline

import os 
import glob
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
import seaborn as sns
from nltools.data import Brain_Data, Adjacency, Design_Matrix
from nltools.file_reader import onsets_to_dm
from nltools.mask import expand_mask, roi_to_brain
from nltools.stats import regress
from nilearn.plotting import plot_img_on_surf, plot_stat_map, plot_glass_brain, view_img_on_surf
import nibabel as nib
import mne
from mne.io import read_raw_snirf
import mne.io.snirf
from snirf import Snirf
import h5py

mne.viz.set_3d_backend("notebook")

base_dir = '/Users/lukechang/Dropbox/Kernel'

## Analyze NIFTI Reconstruction with nltools

To speed up loading data, we save a resampled version of the data. Alternatively, we could load it in the original 4 x 4 x 4 space like we do using nibabel below.

The nifti reconstructed data has been trimmed to 1 sec before start experiment and 1 sec after experiment end.

We are seeing large scaling differences across regions, we are z-scoring data for now, but that means we are losing absolute HbO concentrations

In [None]:

# metadata = {'subject':'S001', 'task_name':'FingerTapping', 'task_id':'6f5cbac'}
metadata = {'subject':'S002', 'task_name':'FingerTapping', 'task_id':'387249d'}
# metadata = {'subject':'S004', 'task_name':'FingerTapping', 'task_id':'8f8a587'}

# data_nifti = Brain_Data(os.path.join(base_dir, 'Data', 'FingerTapping',  f'Test_{metadata["subject"]}_{metadata["task_id"]}_HbO.nii.gz') )
# data_nifti.write(os.path.join(base_dir, 'Data', 'FingerTapping',  f'Test_{metadata["subject"]}_{metadata["task_id"]}_HbO_resampled.nii.gz'))

data_nifti = Brain_Data(os.path.join(base_dir, 'Data', 'FingerTapping',  f'Test_{metadata["subject"]}_{metadata["task_id"]}_HbO_resampled.nii.gz') )
data_nifti = data_nifti.standardize(method='zscore')
toffset =  nib.load(os.path.join(base_dir, 'Data', 'FingerTapping',  f'Test_{metadata["subject"]}_{metadata["task_id"]}_HbO.nii.gz')).header['toffset']
n_tr = len(data_nifti)

mask = Brain_Data('/Users/lukechang/Dropbox/TV_fMRI/Masks/k50_2mm.nii.gz')
mask_x = expand_mask(mask)

(mask_x[26] + mask_x[47]).plot()
plt.show()

### Create Design_Matrix

In [None]:
events = pd.read_csv(os.path.join(base_dir, 'Data', task, f'Test_{metadata["subject"]}_{metadata["task_id"]}_task_events.tsv'), sep='\t')
events['timestamp_adjusted'] = events['timestamp'] - toffset
events['block_type'].fillna('rest', inplace=True)
dm = onsets_to_dm(events[['timestamp_adjusted','duration','block_type']].rename(columns={'timestamp_adjusted':'Onset','duration':'Duration','block_type':'Stim'}), sampling_freq=1.0, run_length=len(data_nifti)).drop(columns='rest')

# Generate Plot
f,a = plt.subplots(1, figsize=(12,4))

left_motor = data_nifti.extract_roi(mask_x[26])
a.plot(left_motor, color='navy')
right_motor = data_nifti.extract_roi(mask_x[47])
a.plot(right_motor, color='red')

# Add colored backgrounds for each integer
colors={'right':'pink', 'left':'skyblue', 'rest':'snow'}
for i,row in dm.iterrows():
    if row['right'] == 1:
        a.axvspan(i, i+1, facecolor=colors['right'], alpha=0.3)
    elif row['left'] == 1:
        a.axvspan(i, i+1, facecolor=colors['left'], alpha=0.3)
    else:
        a.axvspan(i, i+1, facecolor=colors['rest'], alpha=0.3)

a.set_xlabel('Time (sec)', size=18)
a.set_ylabel('Avg HbO', size=18)
legend_elements = [
    Patch(facecolor=colors['right'], alpha=0.3, label='Right'),
    Patch(facecolor=colors['left'], alpha=0.3, label='Left')
]
a.legend(handles=legend_elements, title='Color Regions', loc='center left', bbox_to_anchor=(1, 0.5))
a.set_title(f"{metadata['subject']} {metadata['task_name']}", size=18)
plt.savefig(os.path.join(base_dir, 'Figures', metadata['task_name'], f"{metadata['subject']}_{metadata['task_name']}_AverageSomatomotorActivity_Experiment.png"), dpi=150)
plt.show()


### Run Regression

#### Run using nltools 2 x 2 x 2 resampling

In [None]:
data_nifti.X = dm.convolve().add_dct_basis(duration=100).add_poly(0)
stats_output = data_nifti.regress()

left_v_right = stats_output['beta'][data_nifti.X.columns.str.contains('left')] - stats_output['beta'][data_nifti.X.columns.str.contains('right')]
left_v_right.write(os.path.join(base_dir, 'Analyses', metadata['task_name'], f"{metadata['subject']}_{metadata['task_name']}_LeftVRight_nltools.nii.gz"))

left_v_right.plot(view='glass')
plt.show()
plt.savefig(os.path.join(base_dir, 'Figures', metadata['task_name'], f"{metadata['subject']}_{metadata['task_name']}_LeftVRight_nltools.png"), dpi=150)


#### Run using nibabel 4 x 4 x 4 original sampling

In [None]:
# Load Data using Nibabel
data_nib =  nib.load(os.path.join(base_dir, 'Data', 'FingerTapping',  f'Test_{metadata["subject"]}_{metadata["task_id"]}_HbO.nii.gz') )
dat_nib = data_nib.get_fdata()
# dat_nib[data_nib==0] = np.nan

# Run vectorized regression at each voxel
dm_nib = dm.convolve().add_dct_basis(duration=100).add_poly(0)
flattened_dat = dat_nib.reshape(-1, dat_nib.shape[-1])
flattened_dat = ((flattened_dat.T - np.nanmean(flattened_dat, axis=1))/ np.nanstd(flattened_dat, axis=1)).T # z-score
b,se,t,p,df,res = regress(dm_nib, flattened_dat.T)
lvr = b[dm_nib.columns.str.contains('left'),:] - b[dm_nib.columns.str.contains('right'), :]
lvr = nib.Nifti1Image(lvr.reshape(dat_nib.shape[:-1]), affine=data_nib.affine)
nib.save(lvr, os.path.join(base_dir, 'Analyses', metadata['task_name'], f"{metadata['subject']}_{metadata['task_name']}_LeftVRight_nibabel.nii.gz"))

# Plot glass brain
plot_glass_brain(lvr, cmap='RdBu_r')
plt.savefig(os.path.join(base_dir, 'Figures', metadata['task_name'], f"{metadata['subject']}_{metadata['task_name']}_LeftVRight_nibabel.png"), dpi=150)
plt.show()

view_img_on_surf(lvr)


### Create Peristimulus Plot

In [None]:
left_blocks = {}
right_blocks = {}
for c in dm_block:
    left_blocks[c] = left_motor[dm_block[c]==1]
    right_blocks[c] = right_motor[dm_block[c]==1]
shortest_block = np.min([left_blocks[x].shape for x in left_blocks])
left_motor_blocks = pd.DataFrame({x:left_blocks[x][:shortest_block] for x in left_blocks})
right_motor_blocks = pd.DataFrame({x:right_blocks[x][:shortest_block] for x in right_blocks})
left_motor_blocks['Region'] = 'Left Somatomotor'
right_motor_blocks['Region'] = 'Right Somatomotor'
left_motor_blocks['Time'] = left_motor_blocks.index
right_motor_blocks['Time'] = right_motor_blocks.index
blocks = pd.concat([left_motor_blocks, right_motor_blocks], axis=0)
blocks = blocks.melt(id_vars=['Region', 'Time'], var_name='Block', value_name='HbO')
blocks['Condition'] = [x.split('_')[0] for x in blocks['Block']]

f,a = plt.subplots(ncols=2, figsize=(12,5), sharey=True, sharex=True)
sns.lineplot(data=blocks.query('Condition=="left"'), x='Time', y='HbO', hue='Region', ax=a[0])
a[0].set_title('Left Tapping', size=18)
sns.lineplot(data=blocks.query('Condition=="right"'), x='Time', y='HbO', hue='Region', ax=a[1])
a[1].set_title('Right Tapping', size=18)
plt.savefig(os.path.join(base_dir, 'Figures', metadata['task_name'], f"{metadata['subject']}_{metadata['task_name']}_AverageSomatomotorActivity_Block.png"), dpi=150)
blocks.to_csv(os.path.join(base_dir, 'Figures', metadata['task_name'], f"{metadata['subject']}_{metadata['task_name']}_SomatomotorActivity_Block.csv"), index=False)

plt.show()