# Preliminaries

## Imports

In [1]:
import os
os.environ["http_proxy"] = "http://dahernandez:34732b8f774d6def@ohswg.ottawahospital.on.ca:8080"
os.environ["https_proxy"] = "http://dahernandez:34732b8f774d6def@ohswg.ottawahospital.on.ca:8080"
import pydicom
import subprocess
from pathlib import Path
import nibabel as nib
from dipy.io import read_bvals_bvecs
from dipy.core.gradients import gradient_table
from dipy.io.image import load_nifti, save_nifti
from dipy.reconst.shm import CsaOdfModel
from dipy.direction import peaks_from_model
from dipy.data import default_sphere
from dipy.segment.mask import median_otsu
from dipy.viz import actor, colormap, has_fury, window
from dipy.tracking.stopping_criterion import ThresholdStoppingCriterion
from dipy.reconst.dti import TensorModel
from dipy.tracking.utils import random_seeds_from_mask, path_length
from dipy.tracking.streamline import Streamlines
from dipy.tracking.tracker import eudx_tracking
from dipy.io.stateful_tractogram import Space, StatefulTractogram
from dipy.io.streamline import save_trk
from rt_utils import RTStructBuilder
import matplotlib.pyplot as plt
import numpy as np
from collections import defaultdict
import shutil
import re
import ants

## Functions

### Function to check if a folder path has all the required NIfTI files

In [2]:
# Function to check if a folder path has all the required NIfTI files
def check_nifti_folder(path, bval_bvec_expected):
    # First set flag to false
    valid_folder = False

    if path.is_dir():
        # Define flags for file types we need
        niigz_flag = False
        bval_flag = False
        bvec_flag = False
        for file_path in path.rglob("*"): # parses every file recursively
            if not file_path.is_file():
                continue
            if str(file_path).lower().endswith(".nii.gz"):
                niigz_flag = True
            elif str(file_path).lower().endswith(".bval"):
                bval_flag = True
            elif str(file_path).lower().endswith(".bvec"):
                bvec_flag = True

        if bval_bvec_expected:
            if niigz_flag and bval_flag and bvec_flag:
                valid_folder = True # Set flag to indicate folder is valid to proceed
                print("✅ NIfTI folder contains the necessary NIfTI files to proceed without conversion.")
            elif not niigz_flag and not bval_flag and not bvec_flag:
                print("❌ NIfTI folder does not contain any of the necessary NIfTI files.")
            else:
                incomplete = True # Set flag to indicate incomplete conversion
                print("⚠️ NIfTI folder contains some of the necessary NIfTI files. Please delete the folder's contents to avoid errors.")
        elif not bval_bvec_expected:
            if niigz_flag:
                valid_folder = True # Set flag to indicate folder is valid to proceed
                print("✅ NIfTI folder contains the necessary NIfTI file to proceed without conversion.")
            else:
                print("❌ NIfTI folder does not contain the necessary NIfTI files.")
    else:
        print("❌ NIfTI folder does not exist.")

    return valid_folder

### Function to extract file name from a given folder of NIfTI files

In [3]:
# Function to extract file name from a given folder of NIfTI files
def get_fname(path):
    fname = [] # Set empty first
    for file_path in path.rglob("*"): # parses every file recursively
        # Extract file name without extension and add to fname if unique
        if str(file_path).lower().endswith(".nii.gz"): 
            if re.sub(r"\.nii\.gz$","", file_path.name) not in fname:
                fname.append(re.sub(r"\.nii\.gz$","", file_path.name)) 
        elif str(file_path).lower().endswith(".bval"):
            if re.sub(r"\.bval$","", file_path.name) not in fname:
                fname.append(re.sub(r"\.bval$","", file_path.name)) 
        elif str(file_path).lower().endswith(".bvec"):
            if re.sub(r"\.bvec$","", file_path.name) not in fname:
                fname.append(re.sub(r"\.bvec$","", file_path.name)) 

    if len(fname) > 1:
        print("❌ Too many NIfTI files in folder. Please only include NIfTI files of diffusion imaging MR scan.")
    elif len(fname) < 1:
        print("❌ No NIfTI files found in folder.")
    else:
        fname = fname[0]
        print("✅ File name succesfully acquired.")

    return fname

### Function to get exported RayStation file PATHS

In [4]:
# Function to get exported RayStation file PATHS
def rs_get_paths(path):

    # Define lists to add file paths to
    CT_File_Paths = []
    RD_File_Paths = []
    RP_File_Paths = []
    RS_File_Paths = []
    MR_File_Paths = []

    for file in path.glob("*.dcm"): # Go through every file in folder (non-recursively)
        if file.is_file():
            try: # Determine what type of file it is
                if "CT" in file.name.upper() and pydicom.dcmread(file, stop_before_pixels=True).Modality == 'CT':
                    CT_File_Paths.append(file)
                elif "RD" in file.name.upper() and pydicom.dcmread(file, stop_before_pixels=True).Modality == 'RTDOSE':
                    RD_File_Paths.append(file)
                elif "RP" in file.name.upper() and pydicom.dcmread(file, stop_before_pixels=True).Modality == 'RTPLAN':
                    RP_File_Paths.append(file)
                elif "RS" in file.name.upper() and pydicom.dcmread(file, stop_before_pixels=True).Modality == 'RTSTRUCT':
                    RS_File_Paths.append(file)
                elif "MR" in file.name.upper() and pydicom.dcmread(file, stop_before_pixels=True).Modality == 'MR':
                    MR_File_Paths.append(file)
                else:
                    print(f"Unknown DICOM file {file.name}")
            except:
                print(f"Skipped invalid DICOM: {file.name}")

    print(f"Found {len(CT_File_Paths)+len(RD_File_Paths)+len(RP_File_Paths)+len(RS_File_Paths)+len(MR_File_Paths)} valid DICOM files")

    file_paths = {
        "CT_File_Paths" : CT_File_Paths,
        "RD_File_Paths" : RD_File_Paths,
        "RP_File_Paths" : RP_File_Paths,
        "RS_File_Paths" : RS_File_Paths,
        "MR_File_Paths" : MR_File_Paths
    }

    return file_paths

### Function to get exported RayStation files

In [5]:
# Function to get exported RayStation files
def rs_get_info(path):

    # Define lists to add file paths to
    CT_File_Paths = []
    RD_File_Paths = []
    RP_File_Paths = []
    RS_File_Paths = []
    MR_File_Paths = []

    # Define lists to add file info to
    CT_Files = []
    RD_Files = []
    RP_Files = []
    RS_Files = []
    MR_Files = []

    for file in path.glob("*.dcm"):
        if file.is_file():
            # print(f"Found file: {file.name}")
            try:
                if "CT" in file.name.upper() and pydicom.dcmread(file, stop_before_pixels=True).Modality == 'CT':
                    CT_Files.append(pydicom.dcmread(file)) 
                    CT_File_Paths.append(file)
                elif "RD" in file.name.upper() and pydicom.dcmread(file, stop_before_pixels=True).Modality == 'RTDOSE':
                    RD_Files.append(pydicom.dcmread(file))
                    RD_File_Paths.append(file)
                elif "RP" in file.name.upper() and pydicom.dcmread(file, stop_before_pixels=True).Modality == 'RTPLAN':
                    RP_Files.append(pydicom.dcmread(file))
                    RP_File_Paths.append(file)
                elif "RS" in file.name.upper() and pydicom.dcmread(file, stop_before_pixels=True).Modality == 'RTSTRUCT':
                    RS_Files.append(pydicom.dcmread(file))
                    RS_File_Paths.append(file)
                elif "MR" in file.name.upper() and pydicom.dcmread(file, stop_before_pixels=True).Modality == 'MR':
                    MR_Files.append(pydicom.dcmread(file))
                    MR_File_Paths.append(file)
                else:
                    print(f"Unknown DICOM file {file.name}")
            except:
                print(f"Skipped invalid DICOM: {file.name}")

    print(f"Found {len(CT_Files)+len(RD_Files)+len(RP_Files)+len(RS_Files)+len(MR_Files)} valid DICOM files")

    file_paths = {
        "CT_File_Paths" : CT_File_Paths,
        "RD_File_Paths" : RD_File_Paths,
        "RP_File_Paths" : RP_File_Paths,
        "RS_File_Paths" : RS_File_Paths,
        "MR_File_Paths" : MR_File_Paths
    }

    file_info = {
        "CT_Files" : CT_Files,
        "RD_Files" : RD_Files,
        "RP_Files" : RP_Files,
        "RS_Files" : RS_Files,
        "MR_Files" : MR_Files
    }

    return file_info, file_paths

### Function to convert from DICOM to NIfTI

In [6]:
def dicom_to_nifti(dicom_dir, nifti_dir):
    nifti_dir.mkdir(parents=True, exist_ok=True) # make folder for NIFTI if it doesnt exist yet

    # Create command for dcm2niix
    # -z y creates compressed (.gz) .nii files
    # -f %p_%s defines output file name as %p (protocol name(DICOM tag 0018, 1030)) with %s (series(DICOM tag 0020, 0011))
    # -o specifies the output directory of the NIfTI files
    cmd = [
        # "dcm2niix",
        "V:/Common/Staff Personal Folders/DanielH/RayStation_Scripts/Tractography/Subscripts/dcm2niix/dcm2niix.exe",
        "-z", "y",
        "-f", "%p_%s",
        "-o", str(nifti_dir),
        str(dicom_dir)
    ]

    # Run this command in the terminal. Print any errors
    try:
        subprocess.run(cmd, check=True)
    except subprocess.CalledProcessError as e:
        print("STDOUT:", e.stdout)
        print("STDERR:", e.stderr)
        print("Return code:", e.returncode)

    print("✅ DICOM files successfully converted to NIfTI")

## Patient Folder Path

In [7]:
# Base directory to be used
base_dir = Path("V:/Common/Staff Personal Folders/DanielH/DICOM_Files/TractographyPatient/Case 1 RS/")

## Show plots or not

In [8]:
interactive = True

## DICOM to NIfTI

### Check if NIfTI folder already exists with converted files

In [9]:
# Define NIfTI folder path
nifti_dir = base_dir / "NIfTI"

# Check if NIfTI folder has all the required files
valid_folder = check_nifti_folder(nifti_dir, bval_bvec_expected=True)

✅ NIfTI folder contains the necessary NIfTI files to proceed without conversion.


### Make Proper DICOM Folder

#### Identify if folder is valid for tractography (if any file has FA in its Series Description) and extract relevant files (most populous Series Instance UID)

In [10]:
if not valid_folder: # only proceed if NIfTI folder doesn't already contain necessary files
  # Define folder containing raw DICOM files
    dicom_raw_dir = base_dir / "combined"

    # Define dictionary to contain files with a given UID
    series_counts = defaultdict(list)

    FA_flag = False # Set a flag to check if FA is found in any of the folder's file's SeriesDescriptions

    for file_path in dicom_raw_dir.rglob("*"): # parses every file recursively
        if not file_path.is_file():
            continue
        try:
            # Try to read as DICOM using force=True
            ds = pydicom.dcmread(file_path, stop_before_pixels=True, force=True)

            uid = getattr(ds, "SeriesInstanceUID", None) # Get UID
            if uid: # if UID found, add to series_counts
                series_counts[uid].append(file_path)

            if "FA" in str(getattr(ds, "SeriesDescription", None)).upper():
                FA_flag = True # Set FA flag to true if FA found in Series Description
                
        except Exception as e:
            print(f"Skipping {file_path.name}: {e}")


    # Print whether FA flag true or false
    if FA_flag:
        print("\n✅ FA found in SeriesDescription of at least one file in folder. Folder valid for tractography")
    else:
        print("\n❌ No FA found in SeriesDescription of any file in folder. Folder NOT valid for tractography")

    # Print UID counts
    print("\nFound SeriesInstanceUIDs:")
    for uid, files in series_counts.items():
        print(f"{uid} - {len(files)} files")

    # Identify most populous UID
    if series_counts:
        most_populous_uid = max(series_counts, key=lambda k: len(series_counts[k]))
        print(f"\nMost populous UID: {most_populous_uid} ({len(series_counts[most_populous_uid])} slices)")
        relevant_files = series_counts[most_populous_uid] # assign files in most populous uid to relevant files
    else:
        print("\nNo valid DICOMs found.")


#### Copy relevant files to new folder

In [11]:
if not valid_folder: # only proceed if NIfTI folder doesn't already contain necessary files
    output_dir = base_dir / "DICOM"
    output_dir.mkdir(parents=True, exist_ok=True) # make folder for derived relevant DICOM files if it doesnt exist yet

    for file_path in relevant_files:
        try:
            destination_path = output_dir / file_path.name # joins variables as path. (not division since path variable is involved)
            if destination_path.is_file(): continue # Skip if path already has file
            shutil.copy2(file_path, destination_path) # Copy file to folder if path doesn't have file
        except Exception as e:
            print(f"Unable to copy {file_path.name}: {e}")

### Conversion

In [12]:
if not valid_folder: # only proceed if NIfTI folder doesn't already contain necessary files
    # Define paths to DICOM folder. NIfTI folder should already have path defined
    dicom_dir = output_dir # Should have same path as output directory defined earlier
    
    dicom_to_nifti(dicom_dir, nifti_dir)

# Tractography

## Load tracts if they exist

In [13]:
# Define paths
trk_dir = base_dir / "Tracts"
# trk_dir.mkdir(parents=True, exist_ok=True) # make folder if it doesnt exist yet
trk_path = trk_dir / "tractogram_EuDX.trk"
trk_path_gtv = trk_dir / "tractogram_GTV_EuDX.trk"

if trk_path.is_file() and trk_path_gtv.is_file():
    # Load the streamlines from the trk file
    trk = nib.streamlines.load(trk_path) # load trk file
    streamlines = trk.streamlines; trk_aff = trk.affine # streamlines and affine

    trk_gtv = nib.streamlines.load(trk_path_gtv) # load trk file
    streamlines_gtv = trk_gtv.streamlines; trk_gtv_aff = trk_gtv.affine # streamlines and affine

    # Check that both affines are equal
    assert np.array_equal(trk_aff, trk_gtv_aff), "Affines from white matter tracts and GTV tracts are not matching."

    # Set affine matrix
    affine = trk_aff
    
    tracts_flag = True # Set flag to indicate tracts exist

## Load white matter mask if it exists

In [14]:
# Define path
rs_dir = base_dir / "RayStation" # Folder containing RayStation (RS) exports
rs_rois_nii_dir = rs_dir / "ROIs_NIfTI"
white_matter_mask_nii_path = rs_rois_nii_dir / "white_matter_mask.nii.gz" # define file path

if white_matter_mask_nii_path.is_file():
    # Load white matter mask
    white_matter_mask_nii = nib.load(white_matter_mask_nii_path); white_matter_mask = white_matter_mask_nii.get_fdata()

## Obtain file name

In [None]:
# Extract file name
fname = get_fname(nifti_dir)

## Extract data and perform segmentation

In [None]:
# Define file names
nifti_file = str(nifti_dir / (fname + ".nii.gz"))
bval_file  = str(nifti_dir / (fname + ".bval"))
bvec_file  = str(nifti_dir / (fname + ".bvec"))

# Extract data
data, affine, hardi_img = load_nifti(nifti_file, return_img = True)
bvals, bvecs = read_bvals_bvecs(bval_file, bvec_file)

# Make gradient table
gtab = gradient_table(bvals, bvecs = bvecs)

# Make brain mask
data_masked, mask = median_otsu(data, vol_idx=range(data.shape[3]), numpass=1)

## Create white matter mask with DTI

In [None]:
# Fit the diffusion tensor model
tensor_model = TensorModel(gtab)
tensor_fit = tensor_model.fit(data_masked)

# Get FA map
FA = tensor_fit.fa

# Generate white matter mask using FA threshold
# Typical FA threshold for white matter is between 0.2 - 0.3. Can use 0.25
white_matter_mask = (FA > 0.15).astype(np.uint8) # paper uses 0.15
white_matter_mask[:,:,:]=white_matter_mask[::-1,::-1,:] # reverse order in x and y directions to visualize like in RayStation

## Obtain ROIs

### Create folders for CT DICOM, MR DICOM, and RT Struct if necessary

In [15]:
# Define Paths
rs_dir = base_dir / "RayStation" # Folder containing RayStation (RS) exports
rs_ct_dcm_dir = rs_dir / "CT_DICOM" # Folder containg RS CT DICOM exports
rs_mr_dcm_dir = rs_dir / "MR_DICOM" # Folder containing RS MR DICOM exports
rs_rois_dir = rs_dir / "ROIs" # Folder containing RS ROIs (in RT struct)

# Flags to indicate whether folders contain necessary files
rs_ct_dcm_flag = False
rs_mr_dcm_flag = False
rs_rois_flag = False

# Check if folders exist, if they do, check if they contain the appropriate file type
# Delete files in folders if they don't contain what we want to prepare for the movement of files into the proper folders
if rs_ct_dcm_dir.is_dir():
    file_paths = rs_get_paths(rs_ct_dcm_dir)
    rs_ct_dcm_flag = True if file_paths["CT_File_Paths"] else False
    if not rs_ct_dcm_flag:
        shutil.rmtree(rs_ct_dcm_dir)
    
if rs_mr_dcm_dir.is_dir():
    file_paths = rs_get_paths(rs_mr_dcm_dir)
    rs_mr_dcm_flag = True if file_paths["MR_File_Paths"] else False
    if not rs_mr_dcm_flag:
        shutil.rmtree(rs_mr_dcm_dir)

if rs_rois_dir.is_dir():
    file_paths = rs_get_paths(rs_rois_dir)
    rs_rois_flag = True if file_paths["RS_File_Paths"] else False
    if not rs_rois_flag:
        shutil.rmtree(rs_rois_dir)

Found 295 valid DICOM files
Found 70 valid DICOM files
Found 1 valid DICOM files


In [16]:
# Set flag to false first
valid_folders = False

if rs_ct_dcm_flag and rs_mr_dcm_flag and rs_rois_flag: # Continue if all folders valid
    valid_folders = True
else: # Create folders for missing file types
    file_paths = rs_get_paths(rs_dir) # Get all file paths from original RayStation folder

    if not file_paths["CT_File_Paths"] or not file_paths["MR_File_Paths"] or not file_paths["RS_File_Paths"]:
        raise ValueError(f"No RayStation files found in {rs_dir}. \nMay be missing CT files, MR files, or RS files, or a combination.")
    
    if not rs_ct_dcm_flag:
        rs_ct_dcm_dir.mkdir(parents=True, exist_ok=True) # make folder if it doesn't exist
        for file in file_paths["CT_File_Paths"]: # Get all files and move to new folder
            # Move file to new folder
            shutil.move(file, rs_ct_dcm_dir)
        rs_ct_dcm_flag = True # Set flag to true when process complete

    if not rs_mr_dcm_flag:
        rs_mr_dcm_dir.mkdir(parents=True, exist_ok=True) # make folder if it doesn't exist
        for file in file_paths["MR_File_Paths"]: # Get all files and move to new folder
            # Move file to new folder
            shutil.move(file, rs_mr_dcm_dir)
        rs_mr_dcm_flag = True # Set flag to true when process complete

    if not rs_rois_flag:
        rs_rois_dir.mkdir(parents=True, exist_ok=True) # make folder if it doesn't exist
        for file in file_paths["RS_File_Paths"]: # Get all files and move to new folder
            # Move file to new folder
            shutil.move(file, rs_rois_dir)
        rs_rois_flag = True # Set flag to true when process complete

    valid_folders = True # Set flag to true when all processes complete

### Load the ROIs

In [17]:
# First check if masks already exist
if not valid_folders:
    raise ValueError("Folders containing RayStation files not validated.")

# Define folder and paths
rs_rois_nii_dir = rs_dir / "ROIs_NIfTI"
rs_rois_nii_dir.mkdir(parents=True, exist_ok=True) # make folder if it doesnt exist yet
gtv_mask_nii_path = rs_rois_nii_dir / "gtv_mask.nii.gz" # define file path
external_mask_nii_path = rs_rois_nii_dir / "external_mask.nii.gz" # define file path
brain_mask_nii_path = rs_rois_nii_dir / "brain_mask.nii.gz" # define file path
white_matter_mask_nii_path = rs_rois_nii_dir / "white_matter_mask.nii.gz" # define file path

if gtv_mask_nii_path.is_file() and external_mask_nii_path.is_file() and brain_mask_nii_path.is_file():
    # Get masks from pre-defined files
    gtv_mask_nii = nib.load(gtv_mask_nii_path); gtv_mask = gtv_mask_nii.get_fdata()
    external_mask_nii = nib.load(external_mask_nii_path); external_mask = external_mask_nii.get_fdata()
    brain_mask_nii = nib.load(brain_mask_nii_path); brain_mask = brain_mask_nii.get_fdata()
    
else:
    # Using RT_Utils package

    # Get path for RT Struct with ROIs
    file_paths = rs_get_paths(rs_rois_dir)
    rt_struct_path = file_paths["RS_File_Paths"][0] # Should only be one RT Struct file

    # Load RTStruct
    rtstruct = RTStructBuilder.create_from(dicom_series_path=rs_ct_dcm_dir, rt_struct_path=rt_struct_path)

    # List available ROI names
    print(f"ROI Names: {rtstruct.get_roi_names()}")

    # Choose ROI(s) to convert to NIfTI mask. Note that names must be exact
    gtv_name = [name for name in rtstruct.get_roi_names() if "GTV" in name] # Get names that contain GTV
    gtv_name = gtv_name[0] # Take first name from list of GTV names 
    gtv_mask = rtstruct.get_roi_mask_by_name(gtv_name)  # 3D binary numpy array
    gtv_mask = np.transpose(gtv_mask, (1, 0, 2)) # change to [x y z]
    gtv_mask = gtv_mask[:, ::-1, :] # flip y-axis to work properly for ANTs

    external_mask = rtstruct.get_roi_mask_by_name("External") # Get External
    external_mask = np.transpose(external_mask, (1, 0, 2)) # change to [x y z]
    external_mask = external_mask[:, ::-1, :] # flip y-axis to work properly for ANTs

    brain_mask = rtstruct.get_roi_mask_by_name("Brain") # Get Brain
    brain_mask = np.transpose(brain_mask, (1, 0, 2)) # change to [x y z]
    brain_mask = brain_mask[:, ::-1, :] # flip y-axis to work properly for ANTs

### Check if interpolation/image registration is required

In [18]:
# First check if interpolation is needed. Flag is true when interpolation is needed
interp_flag = True if gtv_mask.shape != white_matter_mask.shape else False
print(f"Interpolation flag: {interp_flag}")

Interpolation flag: False


### Convert CT scans to NIfTI

In [19]:
if interp_flag:
    # Must be converted from DICOM to NIfTI so that ANTs can read the files.
    # ANTs can only read NIfTI

    rs_ct_nii_dir = rs_dir / "CT_NIfTI" # define folder path

    # Check if NIfTI folder has all the required files
    valid_folder = check_nifti_folder(rs_ct_nii_dir, bval_bvec_expected=False)

    if not valid_folder:
        dicom_to_nifti(rs_ct_dcm_dir, rs_ct_nii_dir)

### Convert MR scans from RS into nifti

In [20]:
# Getting affine from MR no matter what. So need to convert MR to NIfTI (should be done already anyways)
# Must be converted from DICOM to NIfTI so that ANTs can read the files.
# ANTs can only read NIfTI

rs_mr_nii_dir = rs_dir / "MR_NIfTI" # define folder path

# Check if NIfTI folder has all the required files
valid_folder = check_nifti_folder(rs_mr_nii_dir, bval_bvec_expected=False)

if not valid_folder:
    dicom_to_nifti(rs_mr_dcm_dir, rs_mr_nii_dir)

✅ NIfTI folder contains the necessary NIfTI file to proceed without conversion.


### Load affines

In [21]:
if interp_flag:
    rs_ct_nii_fname = get_fname(rs_ct_nii_dir) # Acquire file name for ct scan
    rs_ct_nii_fpath = str(rs_ct_nii_dir / (rs_ct_nii_fname + ".nii.gz")) 

    # Extract affine
    _, affine_ct= load_nifti(rs_ct_nii_fpath, return_img = False) # only care about affine here

In [22]:
# Getting affine from MR no matter what
rs_mr_nii_fname = get_fname(rs_mr_nii_dir) # Acquire file name for ct scan
rs_mr_nii_fpath = str(rs_mr_nii_dir / (rs_mr_nii_fname + ".nii.gz")) 

# Extract affine
_, affine_mr= load_nifti(rs_mr_nii_fpath, return_img = False) # only care about affine here

✅ File name succesfully acquired.


In [23]:
assert np.array_equal(affine_mr, affine), "Affines from raw MR and RayStation MR are not matching."

### Save ROIs as NIfTI files (for ANTs in next step)

In [24]:
if interp_flag:
    # Save masks to NIfTI files
    nib.save(nib.Nifti1Image(gtv_mask.astype('uint8'), affine=affine_ct), gtv_mask_nii_path) # use same affine as from CT
    nib.save(nib.Nifti1Image(external_mask.astype('uint8'), affine=affine_ct), external_mask_nii_path) # use same affine as from CT
    nib.save(nib.Nifti1Image(brain_mask.astype('uint8'), affine=affine_ct), brain_mask_nii_path) # use same affine as from CT

### Create image registration from CT to MR using ANTs

In [25]:
if interp_flag:
    # Use ANTs to transform masks from CR to MR space

    # First read NIfTI files with ANTs
    ct_ants = ants.image_read(str(rs_ct_nii_fpath))
    mr_ants = ants.image_read(str(rs_mr_nii_fpath)) # MR file exported from RayStation. NOT raw MR diffusion imaging
    gtv_mask_ants_ct = ants.image_read(str(gtv_mask_nii_path))
    external_mask_ants_ct = ants.image_read(str(external_mask_nii_path))
    brain_mask_ants_ct = ants.image_read(str(brain_mask_nii_path))

    # Register CT to MR
    reg = ants.registration(fixed=mr_ants, moving=ct_ants, type_of_transform='Rigid')

    # Apply transform to masks
    gtv_mask_ants_mr = ants.apply_transforms(fixed=mr_ants, moving=gtv_mask_ants_ct, 
                                            transformlist=reg['fwdtransforms'], interpolator='nearestNeighbor')
    external_mask_ants_mr = ants.apply_transforms(fixed=mr_ants, moving=external_mask_ants_ct, 
                                            transformlist=reg['fwdtransforms'], interpolator='nearestNeighbor')
    brain_mask_ants_mr = ants.apply_transforms(fixed=mr_ants, moving=brain_mask_ants_ct, 
                                            transformlist=reg['fwdtransforms'], interpolator='nearestNeighbor')
    
    # Get masks from ants variables
    gtv_mask = gtv_mask_ants_mr.numpy()[::-1,::-1,:] # reverse x and y axes to be aligned with white matter
    external_mask = external_mask_ants_mr.numpy()[::-1,::-1,:] # reverse x and y axes to be aligned with white matter
    brain_mask = brain_mask_ants_mr.numpy()[::-1,::-1,:] # reverse x and y axes to be aligned with white matter


In [26]:
# Load the affine transformation
affine = ants.read_transform(reg['fwdtransforms'][0])

# Print the transformation type
print("Transform type:", affine.transform_type)

# Print the transformation matrix
print("Transform matrix:\n", affine.parameters)

NameError: name 'reg' is not defined

### Overlap white matter mask with brain mask

In [None]:
# Overlap white matter mask with brain mask to make sure all white matter is within the brain
white_matter_mask = white_matter_mask.astype(bool) & brain_mask.astype(bool)

### Create mask of overlap between GTV and WM

In [None]:
# Create mask overlapping WM with GTV
gtv_wm_mask = gtv_mask.astype(bool) & white_matter_mask.astype(bool)

### Save ROIs as NIfTI files (for good now)

In [None]:
if interp_flag:
    # Save masks to NIfTI files
    nib.save(nib.Nifti1Image(gtv_mask.astype('uint8'), affine=affine_mr), gtv_mask_nii_path) # use same affine as from MR
    nib.save(nib.Nifti1Image(external_mask.astype('uint8'), affine=affine_mr), external_mask_nii_path) # use same affine as from MR
    nib.save(nib.Nifti1Image(brain_mask.astype('uint8'), affine=affine_mr), brain_mask_nii_path) # use same affine as from MR
    nib.save(nib.Nifti1Image(white_matter_mask.astype('uint8'), affine=affine_mr), white_matter_mask_nii_path) # use same affine as from MR

    # Interpolation no longer needed
    interp_flag = False

## Use CSA model and peaks_from_model and define stopping criterion

In [None]:
# Using CSA (Constant Solid Angle) model then peaks_from_model
csa_model = CsaOdfModel(gtab, sh_order_max=4)
csa_peaks = peaks_from_model(
    csa_model, data_masked, default_sphere, relative_peak_threshold=0.5, min_separation_angle=15, mask=white_matter_mask
) # or relative_peak_threshold=0.8, min_seperation_angle=45 (from introduction to basic tracking tutorial)
# from paper: relative_peak_threshold=0.5, min_separation_angle=15

# Define stopping criterion
stopping_criterion = ThresholdStoppingCriterion(FA, 0.15) 
# or csa_peaks.gfa, 0.25 (from introduction to basic tracking tutorial). paper uses FA, 0.15

## Generate seeds

In [None]:
# Generating seeds

# Generating seeds on white matter
seeds_wm = random_seeds_from_mask(white_matter_mask, affine, seeds_count=1, seed_count_per_voxel=True)
# paper seeds all white matter voxels. not just the ones which coincide with the GTV (ROI)
# so can use white_matter_mask or roi_wm_mask

# Generating seeds on GTV
seeds_gtv = random_seeds_from_mask(gtv_mask, affine, seeds_count=1, seed_count_per_voxel=True)

## Use EuDX tracking to create streamlines

In [None]:
# Using EuDX tracking for now. 

# Creating streamlines from all white matter and from GTV. First white matter.
# Initialization of eudx_tracking. The computation happens in the next step.
streamlines_generator_wm = eudx_tracking(
    seeds_wm, stopping_criterion, affine, step_size=0.5, pam=csa_peaks, max_angle=60 # paper uses max_angle of 60
)
# Generate streamlines object
streamlines_wm = Streamlines(streamlines_generator_wm)

# Now creating streamlines from GTV
streamlines_generator_gtv = eudx_tracking(
    seeds_gtv, stopping_criterion, affine, step_size=0.5, pam=csa_peaks, max_angle=60 # paper uses max_angle of 60
)
# Generate streamlines object
streamlines_gtv = Streamlines(streamlines_generator_gtv)

# colors: red--> left to right, green--> front (anterior) to back (posterior), blue--> top to bottom
# # Streamlines with x and y flipped for colors
# streamlines_flipped = streamlines.copy()
# streamlines_flipped[:][:, 0:2] = streamlines_flipped[:][:, 1::-1]

## Show tracks

In [79]:
if has_fury:

    from fury import utils

    # streamlines_actor_wm = actor.line(
    #     streamlines_wm, colors=colormap.line_colors(streamlines_wm, cmap = "rgb_standard"), opacity=0.25
    # )

    streamlines_actor_gtv = actor.line(
        streamlines_gtv, colors=colormap.line_colors(streamlines_gtv, cmap = "rgb_standard"), opacity=1
    )

    gtv_actor = actor.contour_from_roi(
        gtv_mask, affine=affine, opacity=0.75, color = (1,0,0) # red
    )

    # wm_actor = actor.contour_from_roi(
    #     white_matter_mask, affine=affine, opacity=0.25, color=(1,1,1) # white
    # )

    # gtv_wm_actor = actor.contour_from_roi(
    #     gtv_wm_mask, affine=affine, opacity=0.25, color=(0, 1, 0) # green
    # ) 

    external_actor = actor.contour_from_roi(
        external_mask, affine=affine, opacity=0.5, color=(0.676, 0.844, 0.898) # light blue
    ) 

    # brain_actor = actor.contour_from_roi(
    #     brain_mask, affine=affine, opacity=0.5, color=(1, 0.753, 0.796) # pink
    # ) 

    # Create the 3D display.
    scene = window.Scene()
    # scene.add(streamlines_actor_wm)
    scene.add(streamlines_actor_gtv)
    scene.add(gtv_actor)
    # scene.add(wm_actor)
    # scene.add(gtv_wm_actor)
    scene.add(external_actor)
    # scene.add(brain_actor)

    # Define colours to show
    colours = {
        'GTV' : (1,0.1,0.1), # red
        'External' : (0.676, 0.844, 0.898) # light blue
    }

    # Add custom legend (text + coloured squares)
    legend_pos = np.array([150, 30, 0])
    spacing = 20

    for i, (label, colour) in enumerate(colours.items()):
        # Add a small square actor as color indicator
        square_actor = actor.sphere(centers=np.array([legend_pos + [0, -i*spacing, -i*spacing]]),
                                    colors=np.array([colour]),
                                    radii=10)
        scene.add(square_actor)
        
        # Add the label as text next to it
        # direction=None makes label follow camera
        text_actor = actor.vector_text(text=label, pos=legend_pos + [100, -i*spacing, -i*spacing], scale=(20,20,20), direction=None, align_center=False, extrusion=10)
        scene.add(text_actor)

    # Add informational text
    info_text = "In tractography, the direction of streamlines is labelled by red, green, and blue, where..." \
                 "\nRed indicates directions in the X axis: right to left or left to right." \
                 "\nGreen indicates directions in the Y axis: posterior to anterior or from anterior to posterior." \
                 "\nBlue indicates directions in the Z axis: inferior to superior or vice versa."
    info_actor = actor.vector_text(text=info_text, pos=legend_pos + [100,-100,-100], scale=(5,5,5), direction=None, align_center=False, extrusion=10)
    scene.add(info_actor)

    # Save still images for this static example. Or for interactivity use
    # window.record(scene=scene, out_path="tractogram_EuDX.png", size=(800, 800))
    if interactive:
        window.show(scene)

In [57]:
from fury import window, actor
scene = window.Scene()
l = actor.vector_text(text='Hello')
scene.add(l)
window.show(scene)

## Save tracks

In [None]:
# Define/create folder and path
trk_dir = base_dir / "Tracts"
trk_dir.mkdir(parents=True, exist_ok=True) # make folder if it doesnt exist yet
trk_path = trk_dir / "tractogram_EuDX.trk"

# Define tractogram and save
sft = StatefulTractogram(streamlines_wm, hardi_img, Space.RASMM)
save_trk(sft, str(trk_path), streamlines_wm)

# Define path
trk_path_gtv = trk_dir / "tractogram_GTV_EuDX.trk"

# Define tractogram and save
sft_gtv = StatefulTractogram(streamlines_gtv, hardi_img, Space.RASMM)
save_trk(sft_gtv, str(trk_path_gtv), streamlines_gtv)

# WMPL

## Create WMPL

In [None]:
# load the streamlines from the trk file
trk = nib.streamlines.load(trk_path) # load trk file
streamlines = trk.streamlines; hdr = trk.header; trk_aff = trk.affine # streamlines, header info and affine

# Load the GTV from the NIfTI file
gtv_img = nib.load(gtv_mask_nii_path)
gtv_mask = gtv_img.get_fdata()
gtv_aff = gtv_img.affine

# Compute (minimum) path length per voxel # calculate the WMPL
wmpl = path_length(streamlines, trk_aff, gtv_mask) # fill_value = 0 or -1? paper leaves blank

# Define where to save WMPL
wmpl_dir_nii = base_dir / "WMPL/NIfTI"
wmpl_dir_nii.mkdir(parents=True, exist_ok=True) # make folder if it doesnt exist yet
wmpl_path_nii = wmpl_dir_nii / "WMPL_map.nii.gz"

# save the WMPL as a NIfTI
save_nifti(wmpl_path_nii, wmpl, trk_aff)

## Save WMPL as a DICOM

In [None]:
# Load in MR data used to make tracks
# This dats should be same size (as in (x,y,z)) as the white matter mask
# For example, (256,256,70) for both

files, _ = rs_get_info(rs_mr_dcm_dir)
slice_thickness = files["MR_Files"][0].SliceThickness # take slice thickness from first MR file

In [None]:
# Load in MR data
Sorted_MR_Files = sorted(files["MR_Files"], key=lambda file: float(file.ImagePositionPatient[2])) # sort files by z-axis. increasing towards the head
# anatomical orientation type (0010,2210) absent so z-axis is increasing towards the head of the patient

# Define where to save WMPL
wmpl_dir_dcm = base_dir / "WMPL/DICOM"
wmpl_dir_dcm.mkdir(parents=True, exist_ok=True) # make folder if it doesnt exist yet

# create new series UID
new_series_uid = pydicom.uid.generate_uid()

for i in range(wmpl.shape[2]):  # For each slice
    # ensure overlays properly with RayStation. Basically undoing what I did with ROIs.
    slice_data = wmpl[::-1,:,i].astype(np.uint16).T  

    # Get appropriate reference DICOM file
    ref_dcm = Sorted_MR_Files[i]
    dcm = ref_dcm.copy()

    # Modify instance-specific metadata
    dcm.InstanceNumber = i + 1
    dcm.SeriesInstanceUID = new_series_uid
    dcm.SOPInstanceUID = pydicom.uid.generate_uid()
    dcm.PixelData = slice_data.tobytes()
    dcm.Rows, dcm.Columns = slice_data.shape

    dcm.save_as(wmpl_dir_dcm / f"WMPL_slice_{i+1:03d}.dcm")


## Show WMPL map

In [None]:
if has_fury:

    # Load WMPL map
    wmpl_img = nib.load(wmpl_path_nii); wmpl_data = wmpl_img.get_fdata(); affine = wmpl_img.affine
    
    # mask where WMPL > 0
    wmpl_mask = wmpl_data > 0

    wmpl_actor = actor.contour_from_roi(
        wmpl_mask, affine=affine, opacity=0.5, color=(0, 1, 0) # green
    ) 

    # Extract voxel coordinates where WMPL > 0
    voxel_coords = np.array(np.nonzero(wmpl_mask)).T # shape (N,3)

    # Get corresponding WMPL values at these voxels
    values = wmpl_data[wmpl_mask]

    # Map voxel coords to real world coordinates (RASMM)
    ras_coords = nib.affines.apply_affine(wmpl_img.affine, voxel_coords) # affine from wmpl should be same as affines from before

    # Create a colormap for WMPL values
    cmap = colormap.create_colormap(values, name='hot')

    # Create a point cloud actor with colors
    points_actor = actor.point(
        ras_coords, cmap, point_radius=files["MR_Files"][0].SliceThickness, opacity=0.75 # voxels are 1.5 mm (from what ive seen) in x,y,z (isotropic)
    ) 

    # Create actor for external
    external_actor = actor.contour_from_roi(
        external_mask, affine=affine, opacity=0.5, color=(0, 0, 1) # blue
    ) 

    # Create the 3D display.
    scene = window.Scene()
    scene.add(points_actor)
    scene.add(external_actor)

    # Show plot
    if interactive:
        window.show(scene)
    