# Optimize an Electrode

In [65]:
recon_path = '/Users/cu135/Partners HealthCare Dropbox/Calvin Howard/studies/collaborations/Richardson_DBS/Reco_Chron_Pain/PAG/ea_reconstruction.mat'
target_path = '/Users/cu135/Partners HealthCare Dropbox/Calvin Howard/studies/collaborations/Richardson_DBS/optimized_program/target_maps/flipped_SEEG_pain_intensity_vs_sham_inverted_pcc_inverse_r_map.nii'
out_dir = '/Users/cu135/hires_backdrops/test_electrode'

In [11]:
from stim_pyper.processing_utils.run_optimizer import OptimizerProcessor
optimizer = OptimizerProcessor(electrode_data=recon_path, nifti_path=target_path, output_path=out_dir)
d = optimizer.run()

KeyboardInterrupt: 

In [None]:
for idx in d.values():
    print(type(idx))

<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>


In [None]:
import os 
import h5py
import json
import numpy as np
from pathlib import Path
from typing import Union, Optional, List, Dict

class MatReaderV2:
    '''Class to extract reconstruction data from ea_reconstruction.mat files.'''
    def __init__(self, file_path: Union[str, Path]):
        self.file_path = Path(file_path).expanduser().resolve()
        self._data: Optional[dict] = None
        self._f: Optional[h5py.File] = None

    def _h5_to_dict(self, group):
        '''Converts an h5 file to a dict.'''
        out = {}
        for k, v in group.items():
            out[k] = v if isinstance(v, h5py.Dataset) else self._h5_to_dict(v)
        return out

    def _load(self):
        '''Loads or returns the data (for getting pointers) and h5 file (for referencing w/ pointers)'''
        if self._data is not None:
            return self._data
        if self.file_path.suffix != ".mat":
            raise ValueError("not a .mat file")
        try:
            from scipy.io import loadmat
            self._data = loadmat(self.file_path, simplify_cells=True)
            self._f = None
        except (NotImplementedError, ValueError):
            self._f = h5py.File(self.file_path, "r")
            self._data = self._h5_to_dict(self._f)
        return self._data

    def _get(self, *keys):
        '''Recursion function'''
        node = self._load()
        for k in keys:
            if not isinstance(node, dict) or k not in node:
                return None
            node = node[k]
        return node

    def resolve(self, ref):
        '''references the h5py file with retrieved headers to get data'''
        if self._f is None:
            raise RuntimeError("Cannot resolve reference: no open h5py.File")
        return self._f[ref][()].T
    
    def coords(self) -> List[np.ndarray]:
        d = self._get("reco", "mni", "coords_mm")
        if d is None:
            return []

        arr = d[()] if isinstance(d, h5py.Dataset) else d  # payload

        # single electrode, already numeric
        if arr.dtype != object:
            return [arr]                       # (N,3)

        # multiple electrodes: each row has three HDF5 refs (x,y,z)
        out = []
        for row in arr:                       # shape (n_electrodes, 3)
            xyz = [self.resolve(r) for r in row]   # three (N,) vectors
            out.append(np.column_stack(xyz))       # (N,3)
        return out

    def elmodels(self) -> list[str]:
        '''Recurses through the dict and gets the elmodel string'''
        e = self._get("reco", "props", "elmodel")
        if e is None:
            return []
        obj = e[()]
        if obj.dtype == object:                          # multiple refs
            return [''.join(map(chr, self._f[r][()].flatten())) for r in obj.flatten()]
        return [''.join(map(chr, obj.flatten()))]        # single string
    
    def segment_lookup(self, electrode_model, segment_lut='contact_segment_labels') -> List[int]:
        '''References the electrode_lut.json file which contains info on which contacts are in which segments.'''
        cwd = os.path.abspath(os.getcwd())
        lut = os.path.join(cwd, 'stim_pyper', 'resources', 'electrode_lut.json')
        with open(lut, 'r') as f:
            lut_dict = json.load(f)
        segments = lut_dict[electrode_model][segment_lut]
        return segments
    
    def pair_contacts_to_segments(self, contact_coords, segments) -> Dict:
        '''Places each contact in a dict with a subdict relating its segment and coordinates.'''
        contact_dict = {}
        for idx in range(0, len(segments)):
            contact_dict[idx] = {segments[idx]: contact_coords[idx]}
        return contact_dict
    
    def run(self) -> List[Dict]:
        '''
        Orchestration function. Returns list of dicts (one per electrode).
        First level keys are the integer of the contact.
        The first level values are the subdict with {segment: array[x,y,z]}
        example: [{contact_integer: {{segment_integer: array[x,y,z]} }}]
        '''
        coords_list  = self.coords()
        model_list   = self.elmodels()

        if len(coords_list) != len(model_list):
            raise ValueError("mismatch coords vs models")

        out = []
        for coord, model in zip(coords_list, model_list):
            segments = self.segment_lookup(model)
            out.append(self.pair_contacts_to_segments(coord, segments))
        return out


In [67]:
MatReaderV2(recon_path).run()

[{0: {1: array([  3.11781, -26.0572 ,  -7.63657])},
  1: {2: array([  4.28923, -24.844  ,  -6.46454])},
  2: {2: array([  3.52362, -24.0464 ,  -6.94082])},
  3: {2: array([  3.16521, -24.8044 ,  -6.04362])},
  4: {3: array([  4.83442, -23.3192 ,  -5.32825])},
  5: {3: array([  4.07143, -22.5292 ,  -5.77824])},
  6: {3: array([  3.70274, -23.2954 ,  -4.8999 ])},
  7: {4: array([  4.81581, -21.4217 ,  -4.13576])}}]

In [4]:
import os
os.path.abspath(os.getcwd())

'/Users/cu135/Software_Local/StimPyPer'