In [16]:
import nibabel as nib
import pandas as pd
import numpy as np
import re
import os

# numpy disable scientific notation for easier debugging
np.set_printoptions(suppress=True, precision=4)

# make prototype for storing CT, T1 and electrodes filenames
class SubjectFiles:
    def __init__(self, subject_root, ct_file, t1_file, electrodes_file):
        self.subject_root = subject_root
        self.ct_file = ct_file
        self.t1_file = t1_file
        self.electrodes_file = electrodes_file

    def ct_date(self):
        return self.get_date(self.ct_file)
    
    def t1_date(self):
        return self.get_date(self.t1_file)

    @staticmethod
    def get_date(filename):
        match = re.search(r"_(\d{8})_", filename)
        if match:
            return match.group(1)
        return None

# Load subjects

In [17]:
input_dir = "test"
subjects: list[SubjectFiles] = []

# walk through all subfolders and search for *CT*, *T1* and electrodes.fcsv files
for root, dirs, files in os.walk(input_dir):
    ct_path = None
    t1_path = None
    fcsv_path = None

    for file in files:
        if "CT" in file and file.endswith((".nii", ".nii.gz")):
            ct_path = file
        elif "T1" in file and not file.startswith("rand_affine_") and file.endswith((".nii", ".nii.gz")):
            t1_path = file
        elif file == "electrodes.fcsv":
            fcsv_path = file
    if ct_path and t1_path and fcsv_path:
        root = root.replace(input_dir + os.sep, "")
        subjects.append(SubjectFiles(root, ct_path, t1_path, fcsv_path))

# Apply random affine

In [18]:
row = []
for subject in subjects:
    ct_img = nib.load(os.path.join(input_dir, subject.subject_root, subject.ct_file))
    t1_img = nib.load(os.path.join(input_dir, subject.subject_root, subject.t1_file))

    # apply random affine to the T1
    random_translation_mm = np.random.uniform(-1000, 1000, 3)
    random_rotation_deg = np.random.uniform(-180, 180, 3)

    random_rotation_rad = np.deg2rad(random_rotation_deg)
    Rx = np.array([[1, 0, 0],
                   [0, np.cos(random_rotation_rad[0]),-np.sin(random_rotation_rad[0])],
                   [0, np.sin(random_rotation_rad[0]), np.cos(random_rotation_rad[0])]])
    
    Ry = np.array([[np.cos(random_rotation_rad[1]), 0, np.sin(random_rotation_rad[1])],
                   [0, 1, 0],
                   [-np.sin(random_rotation_rad[1]), 0, np.cos(random_rotation_rad[1])]])
    
    Rz = np.array([[np.cos(random_rotation_rad[2]), -np.sin(random_rotation_rad[2]), 0],
                   [np.sin(random_rotation_rad[2]), np.cos(random_rotation_rad[2]), 0],
                   [0, 0, 1]])
    
    R = np.eye(4)
    R[:3, :3] = Rz @ Ry @ Rx

    t = np.eye(4)
    t[:3, 3] = random_translation_mm

    new_affine = t @ R @ t1_img.affine
    t1_img = nib.Nifti1Image(t1_img.get_fdata().astype(t1_img.header.get_data_dtype()), new_affine)
    nib.save(t1_img, os.path.join(input_dir, subject.subject_root, f"rand_affine_{subject.t1_file}"))

    row.append({
        "subject_root": subject.subject_root,
        "ct_file": subject.ct_file,
        "ct_date": subject.ct_date(),
        "ct_shape": ct_img.shape,
        "ct_spacing": np.array(ct_img.header.get_zooms()),
        "t1_file": subject.t1_file,
        "t1_date": subject.t1_date(),
        "t1_shape": t1_img.shape,
        "t1_spacing": np.array(t1_img.header.get_zooms()),
        "rand_translation_mm": random_translation_mm,
        "rand_rotation_deg": random_rotation_deg,
        "electrodes_file": subject.electrodes_file
    })
dataset_info_df = pd.DataFrame(row)

# Estimate bolt fiducial

In [None]:
output_fcsv = """\
# Markups fiducial file version = 4.10
# CoordinateSystem = 0
# columns = id,x,y,z,ow,ox,oy,oz,vis,sel,lock,label,desc,associatedNodeID"""

row = []
for subject in subjects:
    # load fcsv
    with open(os.path.join(input_dir, subject.subject_root, subject.electrodes_file), "r") as f:
        electrodes_lines = f.readlines()
    for line in electrodes_lines:
        if line.startswith("# columns = "):
            header_line = line.strip().replace("# columns = ", "")
            columns = header_line.split(",")
            break
    electrodes_df = pd.read_csv(os.path.join(input_dir, subject.subject_root, subject.electrodes_file), skiprows=3, names=columns)

    # split prefix
    electrodes_df["prefix"] = electrodes_df.label.str.extract(r"(.*?)(\d+)$")[0]
    electrodes_df["n_contact"] = electrodes_df.label.str.extract(r"(.*?)(\d+)$")[1].astype(int)

    # get groups by prefix
    labels = []
    groups = electrodes_df.groupby("prefix")
    for i, (prefix, group) in enumerate(groups):
        tip = group.loc[group.n_contact.idxmin()]
        entry = group.loc[group.n_contact.idxmax()]
        
        tip_point = np.array([tip.x, tip.y, tip.z])
        entry_point = np.array([entry.x, entry.y, entry.z])

        tip_to_entry = entry_point - tip_point
        bolt_point = entry_point + tip_to_entry / np.linalg.norm(tip_to_entry) * 10 # move entry point by 10 mm
        bolt_point += np.random.uniform(-1, 1, 3) # add random noise

        output_fcsv += f"\nvtkMRMLMarkupsFiducialNode_{i},{bolt_point[0]},{bolt_point[1]},{bolt_point[2]},0.000,0.000,0.000,1.000,1,0,1,{prefix}-{len(group)},,"

        # for each prefix save len(group)
        labels.append(f"{prefix}-{len(group)}")
    
    row.append({
        "n_electrodes": len(groups),
        "labels": labels
    })

    # save to fcsv
    with open(os.path.join(input_dir, subject.subject_root, "BoltFiducials.fcsv"), "w") as f:
        f.write(output_fcsv)
# concat df and rows
dataset_info_df = pd.concat([dataset_info_df, pd.DataFrame(row)], axis=1)

# save to csv
dataset_info_df.to_csv(os.path.join(input_dir, "dataset.csv"), index=False)