In [37]:
import os
import h5py
import numpy as np
from typing import List, Dict
from stim_pyper.processing_utils.optimizer_postprocessing import process_vta
from scipy.io import loadmat
from glob import glob
from stim_pyper.nifti_utils.bounding_box import NiftiBoundingBox
import shutil

Helper Functions

Assumes a leaddbs BIDS directory, with patients stored in the same lead group. May need to modify the get_stimulations function slightly if your dataset is organized differently.

In [2]:
import os
from glob import glob
from scipy.io import loadmat
import h5py
import numpy as np
def _h5_to_dict(group):
    """Recursively turn an h5 group into a nested dict of numpy arrays."""
    out = {}
    for k, v in group.items():
        if isinstance(v, h5py.Dataset):
            out[k] = v[()]           # load actual data
        else:
            out[k] = _h5_to_dict(v)  # recurse into subgroup
    return out

def get_mat_file(base_path, ftype='stim'):
    """
    Quick & dirty:
    - Try scipy.io.loadmat
    - If it's a v7.3 file (NotImplementedError), use h5py instead
    """
    json_data = []

    for mat_file_path in glob(base_path):
        try:
            data = loadmat(mat_file_path, simplify_cells=True)
        except:
            continue
            # with h5py.File(mat_file_path, "r") as f:
            #     data = _h5_to_dict(f)
        json_data.append({
            'dir_name': os.path.dirname(mat_file_path),
            'data': data if ftype == 'stimulation' else data['reco']['mni']['coords_mm'][:]
        })

    return json_data

In [3]:
def get_v(stim):
    try:
        S = stim['S']
        right_elec = S['Rs1']
        right_amp = np.array(right_elec['amp'])
        
        right_v = []
        for key, value in sorted(right_elec.items(), key=lambda x: (x[0][0], int(x[0][1:])) if x[0][1:].isdigit() else (x[0][0], x[0][1:])):
            if key.startswith('k'):
                perc = np.array(value.get('perc', np.nan))
                try:
                    right_v.append(right_amp * perc / 100)
                except:
                    right_v.append(0)

        left_elec = S['Ls1']
        left_amp = np.array(left_elec['amp'])
        left_v = []
        for key, value in sorted(left_elec.items(), key=lambda x: (x[0][0], int(x[0][1:])) if x[0][1:].isdigit() else (x[0][0], x[0][1:])):
            if key.startswith('k'):
                perc = np.array(value.get('perc', np.nan))
                try:
                    left_v.append(left_amp * perc / 100)
                except:
                    left_v.append(0)
        
        v = {'right': right_v, 'left': left_v}
    except:
        with h5py.File(stim, 'r') as f:
            S = f['S']
            right_elec = S['Rs1']
            right_amp = np.array(right_elec['amp'])
            right_v = []
            for key, value in sorted(right_elec.items(), key=lambda x: (x[0][0], int(x[0][1:])) if x[0][1:].isdigit() else (x[0][0], x[0][1:])):
                if key.startswith('k'):
                    perc = np.array(value.get('perc', np.nan))
                    try:
                        right_v.append(right_amp * perc / 100)
                    except:
                        right_v.append(0)

            left_elec = S['Ls1']
            left_amp = np.array(left_elec['amp'])
            left_v = []
            for key, value in sorted(left_elec.items(), key=lambda x: (x[0][0], int(x[0][1:])) if x[0][1:].isdigit() else (x[0][0], x[0][1:])):
                if key.startswith('k'):
                    perc = np.array(value.get('perc', np.nan))
                    try:
                        left_v.append(left_amp * perc / 100)
                    except:
                        left_v.append(0)
            
            v = {'right': right_v, 'left': left_v}
    return v

In [None]:
def leaddbs_to_stimpyper(stimulations, reconstructions, output_dir, val=None):
    """Processes the data and calls handle_nii_map for each combination of lambda and weight."""
    for reco, stim in zip(reconstructions, stimulations):
        coords_list = ['coords1', 'coords2']
        side = ['right', 'left']
        stimdata = get_v(stim['data'])
        patient_name = reco['dir_name'].split('/')[-2]
        for i, coords in enumerate(coords_list):
            optima_ampers = np.copy(stimdata[side[i]])
            electrode_coords = np.array(reco['data'][i])
            electrode_idx = i
            out_dir = os.path.join(output_dir, patient_name, f'{side[i]}')
            optima_ampers = np.nan_to_num(optima_ampers, nan=0)
            process_vta(optima_ampers, electrode_coords, electrode_idx, out_dir)
            single_contact_files = [
                os.path.join(out_dir, f)
                for f in os.listdir(out_dir)
                if f.startswith('single_contact')
            ]
            bbox = NiftiBoundingBox(single_contact_files)
            bbox.gen_mask(out_dir, 'clinician_resampled.nii.gz')

Start Here

In [None]:
out_dir = '/Users/savirmadan/Partners HealthCare Dropbox/Savir Madan/optimizer (1)/metadata/vtas'
stimulations = get_mat_file(base_path="/Volumes/PdBwh/CompleteParkinsons/derivatives/leaddbs/sub-*/stimulations/MNI152NLin2009bAsym/mostrecentstimulation/sub-*_desc-stimparameters.mat", ftype='stimulation')
reconstructions = get_mat_file(base_path="/Volumes/PdBwh/CompleteParkinsons/derivatives/leaddbs/sub-*/reconstruction/sub-*_desc-reconstruction.mat", ftype='reconstruction')
leaddbs_to_stimpyper(stimulations, reconstructions, output_dir=out_dir)