In [1]:
import pandas as pd
import numpy as np
from glob import glob 

import cv2
import numpy as np

"""
POC: Load Analyze-format MRI (.hdr/.img) and HDF5 data in Python
Requires: nibabel, h5py, numpy, matplotlib, pandas
"""

import os
import nibabel as nb
import matplotlib.pyplot as plt
from scipy import ndimage
import torch
from transformers import pipeline
from PIL import Image

# Load data

data_dir = "./data/"
data_l = glob(data_dir + '*.csv')
print(data_l)

event_df = pd.read_csv(data_l[0])
eye_track_df = pd.read_csv(data_l[1])
motion_df = pd.read_csv(data_l[2])

# Monkey K2 (Kuma)
# Electrodes [52 -> 64] and [121 -> 128] (inclusive) are in the medial wall of the macaque brain.
ecog_df = pd.read_csv(data_l[3])

# Utility Helper Functions:

def show_pipe_masks_on_image(raw_image, outputs):
  plt.imshow(np.array(raw_image))
  ax = plt.gca()
  for mask in outputs["masks"]:
      show_mask(mask, ax=ax, random_color=True)
  plt.axis("off")
  plt.show()
def show_mask(mask, ax, random_color=False):
    if random_color:
        color = np.concatenate([np.random.random(3),
                                np.array([0.6])],
                               axis=0)
    else:
        color = np.array([30/255, 144/255, 255/255, 0.6])
    h, w = mask.shape[-2:]
    mask_image = mask.reshape(h, w, 1) * color.reshape(1, 1, -1)
    ax.imshow(mask_image)



  from .autonotebook import tqdm as notebook_tqdm
2025-05-10 17:05:24.961807: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-05-10 17:05:25.073391: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1746911125.121033   93377 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1746911125.133139   93377 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1746911125.236078   93377 computation_placer.cc:177] computation placer already r

['./data/Event.csv', './data/EyeTrack.csv', './data/Motion.csv', './data/ECoG.csv']


## Extracting centroids of ECoG array

In [2]:
image_path = "./imgs/K2.png"

In [3]:
raw_image = Image.open(image_path)

In [4]:
segment_anything_pipeline = pipeline("mask-generation", model="Zigeng/SlimSAM-uniform-77")

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.48, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
Device set to use cuda:0


In [5]:
output = segment_anything_pipeline(raw_image, points_per_batch=32)

In [8]:
len(output["masks"])


150

In [1]:
plt.imshow(np.array(raw_image))
ax = plt.gca()
for mask in output["masks"]:
    color=np.concatenate([np.random.random(3), np.array([0.6])], axis=0)
    h,w = mask.shape[-2:]
    mask_image = mask.reshape(h, w, 1) * color.reshape(1, 1, -1)
    ax.imshow(mask_image)

NameError: name 'plt' is not defined

In [None]:
show_pipe_masks_on_image(raw_image, output)

: 

In [None]:
# def extract_centroids(image_path):
#     # Read the image
#     image = cv2.imread(image_path)
#     gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)  # Convert to grayscale

#     # Use HoughCircles to detect circles in the image
#     circles = cv2.HoughCircles(
#         gray, 
#         cv2.HOUGH_GRADIENT, dp=1.2, minDist=30,
#         param1=50, param2=30, minRadius=10, maxRadius=100
#     )

#     # If circles are detected
#     if circles is not None:
#         circles = np.round(circles[0, :]).astype("int")  # Convert to integer

#         centroids = []
#         for circle in circles:
#             x, y, r = circle
#             centroids.append((x, y))
#             # Draw the circle and center on the image (optional for visualization)
#             cv2.circle(image, (x, y), r, (0, 255, 0), 4)  # Green circle
#             cv2.circle(image, (x, y), 2, (0, 0, 255), 3)  # Red center
#         # Show the image with detected circles (optional)
#         cv2.imshow("Detected Circles", image)
#         cv2.waitKey(0)
#         cv2.destroyAllWindows()

#         return centroids
#     else:
#         print("No circles were detected.")
#         return []

# # Example usage
# image_path = "./imgs/K2.png"
# centroids = extract_centroids(image_path)
# print("Centroid Coordinates:", centroids)


# MRI data

In [None]:

# ─────────────────────────────────────────────────────────────────────────────
# 1. Paths (edit these to your local files)
# ─────────────────────────────────────────────────────────────────────────────
analyze_hdr = "./data/mri/K2_t1.hdr"   # header for Analyze 7.5
# analyze_img = "./data/mri/K2_t1.img"   # binary data file



In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# 2. Load Analyze-format volume with NiBabel
# ─────────────────────────────────────────────────────────────────────────────
# NiBabel will read the header and image automatically from the .hdr/.img pair.
mri_img = nb.load(analyze_hdr)             # you can equally pass analyze_img here
mri_data = mri_img.get_fdata(dtype=np.float32)
print(f"Analyze volume shape: {mri_data.shape}")
print(f"Affine transform (vox→mm):\n{mri_img.affine}")

In [None]:
# Display a middle axial slice
mid_z = mri_data.shape[2] // 2
plt.figure(figsize=(6,6))
plt.imshow(mri_data[:, :, mid_z].T, origin="lower", cmap="gray")
plt.title("Middle axial slice of Analyze volume")
plt.axis("off")
plt.show()


In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# 4. Summarize intensity distribution with pandas
# ─────────────────────────────────────────────────────────────────────────────
flat = mri_data.flatten()
stats = {
    "minimum": flat.min(),
    "maximum": flat.max(),
    "mean":    flat.mean(),
    "stddev":  flat.std()
}
df = pd.DataFrame(stats, index=["AnalyzeVolume"])
print("\nIntensity summary:")
print(df.to_string())

# ─────────────────────────────────────────────────────────────────────────────
# End of proof-of-concept
# ─────────────────────────────────────────────────────────────────────────────


# Identifying where the centroids of the ecog array are placed

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# 1. Paths: replace these with your actual files
# ─────────────────────────────────────────────────────────────────────────────
# mri_nii   = "./path/to/preop_T1.nii.gz"       # your subject’s structural MRI
ct_nii    = "./imgs/K2.png" # "/path/to/postop_CT_with_ECoG.nii.gz"
atlas_nii = "./data/atlas/D99_v2.0_dist/D99_atlas_v2.0.nii.gz"  # e.g. CHARM or D99
labels_txt = "./data/atlas/D99_v2.0_dist/D99_v2.0_labels_semicolon.txt"
labels_csv= "./data/atlas/D99_v2.0_dist/D99_v2.0_labels_semicolon.csv"           # CSV: {index,label_name}


In [None]:
# Converting text file to csv
# df = pd.read_csv(labels_txt, sep=";")
# df.to_csv(labels_csv)

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# 2. Load images
# ─────────────────────────────────────────────────────────────────────────────
mri_img   = nb.load(analyze_hdr)
# ct_img    = nb.load(ct_nii)
atlas_img = nb.load(atlas_nii)

mri_data   = mri_img.get_fdata()
# ct_data    = ct_img.get_fdata()
atlas_data = atlas_img.get_fdata().astype(int)

In [None]:

# ─────────────────────────────────────────────────────────────────────────────
# 3. Rigid-coregistration: estimate affine from CT → MRI
#    (In practice you’d do this once in ANTs or FSL and save the transform;
#     here we show a simple mutual-information based init from nibabel’s example)
# ─────────────────────────────────────────────────────────────────────────────
from nibabel.processing import resample_from_to
# Resample CT into MRI space (nearest for electrodes)
ct2mri = resample_from_to(ct_img, mri_img, order=0)  # order=0 preserves labels
ct2mri_data = ct2mri.get_fdata()

# ─────────────────────────────────────────────────────────────────────────────
# 4. Segment electrodes in CT (thresholding bright contacts),
#    find their centroids in MRI space
# ─────────────────────────────────────────────────────────────────────────────
# set a threshold high enough to isolate electrodes
thr = np.percentile(ct2mri_data[ct2mri_data>0], 99.5)
bw  = ct2mri_data > thr

# label connected components and compute centroids
labeled, n_labels = ndimage.label(bw)
objects = ndimage.find_objects(labeled)
centroids = []
for label_idx in range(1, n_labels+1):
    coords = np.argwhere(labeled == label_idx)
    if coords.size == 0:
        continue
    # mean in voxel space
    centroid_vox = coords.mean(axis=0)
    # convert to world
    centroid_mri = nb.affines.apply_affine(mri_img.affine, centroid_vox[::-1])
    centroids.append(centroid_mri)
centroids = np.array(centroids)

# ─────────────────────────────────────────────────────────────────────────────
# 5. Map each centroid into atlas indices and lookup label
# ─────────────────────────────────────────────────────────────────────────────
# Invert atlas affine
inv_atlas_affine = np.linalg.inv(atlas_img.affine)
labels = []
for xyz in centroids:
    vox = nb.affines.apply_affine(inv_atlas_affine, xyz)[::-1]
    vox = np.round(vox).astype(int)
    # guard against out-of-bounds:
    if np.any(vox < 0) or vox[0]>=atlas_data.shape[0] or vox[1]>=atlas_data.shape[1] or vox[2]>=atlas_data.shape[2]:
        labels.append("out_of_brain")
    else:
        lbl_idx = int(atlas_data[tuple(vox)])
        labels.append(lbl_idx)

# load your CSV of index→region
import csv
idx2name = {}
with open(labels_csv) as f:
    reader = csv.reader(f)
    for idx,name in reader:
        idx2name[int(idx)] = name

# translate indices to names
region_names = [idx2name.get(i, "unknown") for i in labels]

# ─────────────────────────────────────────────────────────────────────────────
# 6. Print out electrode → region table
# ─────────────────────────────────────────────────────────────────────────────
import pandas as pd
df = pd.DataFrame({
    "electrode_id": np.arange(len(centroids)) + 1,
    "x_mm": np.round(centroids[:,0],1),
    "y_mm": np.round(centroids[:,1],1),
    "z_mm": np.round(centroids[:,2],1),
    "atlas_index": labels,
    "region": region_names
})
print(df.to_string(index=False))

# ─────────────────────────────────────────────────────────────────────────────
# 7. (Optional) Quick coronal slice with electrodes overlaid
# ─────────────────────────────────────────────────────────────────────────────
slice_z = int(mri_data.shape[2]//2)
plt.imshow(mri_data[:,:,slice_z].T, cmap="gray", origin="lower")
pts = []
for (x,y,z), name in zip(centroids, region_names):
    if abs(z - nb.affines.apply_affine(mri_img.affine,[0,0,slice_z])[2])<2:
        pts.append((x, y))
xs, ys = zip(*pts)
plt.scatter(xs, ys, s=30, c="red")
plt.title("Coronal slice with detected ECoG centroids")
plt.show()
