In [2]:
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
import json, os, logging, random, copy, shutil
import seaborn as sns, pandas as pd, numpy as np, nibabel as nib
from ipywidgets import interact, IntSlider, Select, HBox
from utilities import apply_windowing, resize_to_user_resolution
from dicom_tools import DicomToolbox   
from tqdm import tqdm

In [3]:
DATA_DIRECTORY = "/home/ivazquez/Documents/REPOS/nnUNet/raw_data" #"/home/ivazquez/Repos/nnUNet/raw_data/neck_cta/anonymized"
# OUTPUT_DIRECTORY = "/home/ivazquez/Repos/nnUNet/raw_data/neck_cta/nnUNet_raw_data_base"
EXPECTED_DATA =['ct', 'rtstruct']
LABELS = ["common carotid lt"]
DATASET_NAME = 'neck_cta'
MODALITY = 'CT'
TEST_SET_SIZE = 2
LEAVE_OUT = [2,3,4, 14,21, 7]


params = {
    'DATA_DIRECTORY': DATA_DIRECTORY,
    'RESAMPLING': {'apply': True, 'resolution': (1.0, 1.0, 1.0)},
    'LABELS': ["common carotid lt"],
    'DATASET_NAME': /,
    'MODALITY': MODALITY,
    'TEST_SET_SIZE': TEST_SET_SIZE,
    'LEAVE_OUT': LEAVE_OUT
    'INPUT_MODALITIES': ['ct'],
    'OUTPUT_MODALITIES': ['rtstruct']
}



In [4]:
dt = DicomToolbox(patient_data_directory = DATA_DIRECTORY)
dt.expected_data = EXPECTED_DATA
dt.uniform_slice_thickness = False

all_pat_ids = dt.identify_patient_files()

No user inputs were provided. Setting default values.


In [None]:
for p in all_pat_ids:
    dt.parse_dicom_files(p, mask_resolution='ct', mask_names_only=True)
    if LABELS[0] not in dt.contours:
        print(f"Patient {p} does not have {LABELS[0]}")

In [None]:
for p in [pat for pat in all_pat_ids if pat not in ['2' and '21']]: 
    dt.parse_dicom_files(p, mask_resolution='ct', mask_names_only=True)

In [6]:
import json, tqdm

class Preprocessing(DicomToolbox):
    
    def __init__(self, params):
        super().__init__(patient_data_directory=params['DATA_DIRECTORY'])
        self.expected_data = params['INPUT_MODALITIES'] + params['OUTPUT_MODALITIES']
        
    def get_data_insight(self):
        all_pat_ids = self.identify_patient_files()
        data_insight = {p:{} for p in all_pat_ids}
        
        for p in tqdm(all_pat_ids, desc="Getting data insight"):
            self.parse_dicom_files(p, mask_resolution='ct', mask_names_only=True)
            # TODO: function in dicom tools to get available modalities
            data_insight[p]['ct']={'shape': self.ct.data.shape, 
                                   'min': self.ct.data.min(),
                                   'max': self.ct.data.max(),
                                   'dx': self.ct.coordinates.dx,
                                   'dy': self.ct.coordinates.dy,
                                   'dz': self.ct.coordinates.dz}
            data_insight[p]['rtstruct'] = {k: v for k, v in self.contours}
            
            
            
            
            
            if LABELS[0] not in self.contours:
                print(f"Patient {p} does not have {LABELS[0]}")
        
        for p in [pat for pat in all_pat_ids if pat not in LEAVE_OUT]: 
            self.parse_dicom_files(p, mask_resolution='ct', mask_names_only=True)
            
    def save_as_nifti(self):
       pass        
    # def load_data(self):
        
        # self.expected_data = EXPECTED_DATA
        # self.uniform_slice_thickness = False
        


No user inputs were provided. Setting default values.


In [11]:
import time 
from rich.progress import track

for n in track(range(20), description="Processing..."):
    time.sleep(n)

Output()

In [None]:







all_pat_ids = [int(pat) for pat in all_pat_ids if int(pat) not in LEAVE_OUT]

# Randomly select test set
test_set = random.sample(all_pat_ids, TEST_SET_SIZE)
train_set = [pat for pat in all_pat_ids if pat not in test_set]

# Prepare output directories
output_dir = os.path.join(os.path.dirname(DATA_DIRECTORY), 'preprocessed')
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    print(f"Created output directory: {output_dir}")
    
for dir_name in ['imagesTr', 'imagesTs', 'labelsTr', 'labelsTs']:
    if os.path.exists(os.path.join(output_dir, dir_name)): shutil.rmtree(os.path.join(output_dir, dir_name))
    os.makedirs(os.path.join(output_dir, dir_name), exist_ok=True)

def save_as_nifti(array_data, voxel_spacing, file_name):

    if not isinstance(voxel_spacing, (list, tuple)) or len(voxel_spacing) != 3:
        raise ValueError("voxel_spacing must be a list or tuple with 3 elements.")

    affine = np.eye(4)
    affine[0, 0] = voxel_spacing[0]  # Spacing in x
    affine[1, 1] = voxel_spacing[1]  # Spacing in y
    affine[2, 2] = voxel_spacing[2]  # Spacing in z

    # Create a NIfTI1Image object with specified voxel spacing
    nifti_image = nib.Nifti1Image(array_data, affine=affine)

    # Save the image to a NIfTI file
    nib.save(nifti_image, f"{file_name}.nii.gz")
 
def process_patients(all_pat_ids, dt, LABELS, RESAMPLING, output_dir, dataset):
    
    for index, pat_id in enumerate(tqdm(all_pat_ids, desc=f'Processing {dataset} patients')):
        
        dt.dicom_files = dt.run_initial_check(pat_id)  # Grab the dicom files
        dt.ct = dt.parse_ct_study_files(dt.dicom_files['ct'])  # Parse the CT files
        all_contours = dt.parse_structure_files(files=sorted(dt.dicom_files['structures']), patient_id=pat_id, names_only=True)  # Parse the structure files
        orig_coords = copy.deepcopy(dt.ct.coordinates)  # store copy of original CT coordinates

        # Resize CT if necessary
        ct, coords = resize_to_user_resolution(dt.ct.data, orig_coords, RESAMPLING['resolution'], fill_value=dt.ct.data.min()) if RESAMPLING['apply'] else (dt.ct.data, dt.ct.coordinates)
        
        # Save the CT
        if dataset == 'train':
            save_as_nifti(ct, RESAMPLING['resolution'], os.path.join(output_dir, 'imagesTr', f"{dataset}Patient_{index:03}_0000"))
        else:
            save_as_nifti(ct, RESAMPLING['resolution'], os.path.join(output_dir, 'imagesTs', f"{dataset}Patient_{index:03}_0000"))
        
        # Prepare the labels
        labels_dict = {}

        for label in LABELS:
            if label not in all_contours: break
            contour = dt.parse_structure_files(files=sorted(dt.dicom_files['structures']), patient_id=pat_id, mask_names=label, resolution='ct')

            # Resize the mask if necessary
            mask, _ = resize_to_user_resolution(contour[label].data, orig_coords, RESAMPLING['resolution'], fill_value=0) if RESAMPLING['apply'] else (contour[label].data, None)

            labels_dict[label] = mask

        # Save the labels
        for l in labels_dict:
            if dataset == 'train':
                save_as_nifti(labels_dict[l], RESAMPLING['resolution'], os.path.join(output_dir, 'labelsTr', f"{}Patient_{index:03}"))
            else:
                save_as_nifti(labels_dict[l], RESAMPLING['resolution'], os.path.join(output_dir, 'labelsTs', f"patient_{index:03}"))
        
       
process_patients(test_set, dt, LABELS, RESAMPLING, output_dir, 'test')
process_patients(train_set, dt, LABELS, RESAMPLING, output_dir, 'train')

In [None]:
all_contours = [dt.parse_dicom_files(pat, mask_resolution='ct', mask_names_only=True) for pat in all_pat_ids]

# Assuming all_contours is a list of contour names
unique_contours, counts = np.unique(all_contours, return_counts=True)

# Printing unique contours and their counts
for contour, count in zip(unique_contours, counts):
    print(f"{contour}, Count: {count}")

In [None]:
import nibabel as nib
import numpy as np

# Your NumPy array (replace with your actual data)
array_data = np.random.rand(64, 64, 32)  # Example: 64x64x32 array

# Define voxel spacing: [x, y, z]
voxel_spacing = [2.0, 2.0, 2.5]

# Create an affine matrix with voxel spacing
affine = np.eye(4)
affine[0, 0] = voxel_spacing[0]  # Spacing in x
affine[1, 1] = voxel_spacing[1]  # Spacing in y
affine[2, 2] = voxel_spacing[2]  # Spacing in z

# Create a NIfTI1Image object with specified voxel spacing
nifti_image = nib.Nifti1Image(array_data, affine=affine)

# Save the image to a NIfTI file
nib.save(nifti_image, 'my_nifti_file_with_spacing.nii.gz')

In [None]:
from typing import Tuple

from batchgenerators.utilities.file_and_folder_operations import save_json, join

channel_names = {0: 'CT'}
labels = {'common carotid lt': 1,
          'background': 0}
num_training_cases = len(os.listdir(os.path.join(output_dir, 'imagesTr')))
file_extension = '.nii.gz'

def generate_dataset_json(output_folder: str,
                          channel_names: dict,
                          labels: dict,
                          num_training_cases: int,
                          file_ending: str,
                          regions_class_order: Tuple[int, ...] = None,
                          dataset_name: str = None, reference: str = None, release: str = None, license: str = None,
                          description: str = None,
                          overwrite_image_reader_writer: str = None, **kwargs):
    """
    Generates a dataset.json file in the output folder

    channel_names:
        Channel names must map the index to the name of the channel, example:
        {
            0: 'T1',
            1: 'CT'
        }
        Note that the channel names may influence the normalization scheme!! Learn more in the documentation.

    labels:
        This will tell nnU-Net what labels to expect. Important: This will also determine whether you use region-based training or not.
        Example regular labels:
        {
            'background': 0,
            'left atrium': 1,
            'some other label': 2
        }
        Example region-based training:
        {
            'background': 0,
            'whole tumor': (1, 2, 3),
            'tumor core': (2, 3),
            'enhancing tumor': 3
        }

        Remember that nnU-Net expects consecutive values for labels! nnU-Net also expects 0 to be background!

    num_training_cases: is used to double check all cases are there!

    file_ending: needed for finding the files correctly. IMPORTANT! File endings must match between images and
    segmentations!

    dataset_name, reference, release, license, description: self-explanatory and not used by nnU-Net. Just for
    completeness and as a reminder that these would be great!

    overwrite_image_reader_writer: If you need a special IO class for your dataset you can derive it from
    BaseReaderWriter, place it into nnunet.imageio and reference it here by name

    kwargs: whatever you put here will be placed in the dataset.json as well

    """
    has_regions: bool = any([isinstance(i, (tuple, list)) and len(i) > 1 for i in labels.values()])
    if has_regions:
        assert regions_class_order is not None, f"You have defined regions but regions_class_order is not set. " \
                                                f"You need that."
    # channel names need strings as keys
    keys = list(channel_names.keys())
    for k in keys:
        if not isinstance(k, str):
            channel_names[str(k)] = channel_names[k]
            del channel_names[k]

    # labels need ints as values
    for l in labels.keys():
        value = labels[l]
        if isinstance(value, (tuple, list)):
            value = tuple([int(i) for i in value])
            labels[l] = value
        else:
            labels[l] = int(labels[l])

    dataset_json = {
        'channel_names': channel_names,  # previously this was called 'modality'. I didn't like this so this is
        # channel_names now. Live with it.
        'labels': labels,
        'numTraining': num_training_cases,
        'file_ending': file_ending,
    }

    if dataset_name is not None:
        dataset_json['name'] = dataset_name
    if reference is not None:
        dataset_json['reference'] = reference
    if release is not None:
        dataset_json['release'] = release
    if license is not None:
        dataset_json['licence'] = license
    if description is not None:
        dataset_json['description'] = description
    if overwrite_image_reader_writer is not None:
        dataset_json['overwrite_image_reader_writer'] = overwrite_image_reader_writer
    if regions_class_order is not None:
        dataset_json['regions_class_order'] = regions_class_order

    dataset_json.update(kwargs)

    save_json(dataset_json, join(output_folder, 'dataset.json'), sort_keys=False)
    
generate_dataset_json(output_dir, channel_names, labels, num_training_cases, file_extension)

The scheme introduced above results in the following folder structure. Given is an example for the first Dataset of the MSD: BrainTumour. This dataset hat four input channels: FLAIR (0000), T1w (0001), T1gd (0002) and T2w (0003). Note that the imagesTs folder is optional and does not have to be present.

```
nnUNet_raw/Dataset001_BrainTumour/
├── dataset.json
├── imagesTr
│   ├── BRATS_001_0000.nii.gz
│   ├── BRATS_001_0001.nii.gz
│   ├── BRATS_001_0002.nii.gz
│   ├── BRATS_001_0003.nii.gz
│   ├── BRATS_002_0000.nii.gz
│   ├── BRATS_002_0001.nii.gz
│   ├── BRATS_002_0002.nii.gz
│   ├── BRATS_002_0003.nii.gz
│   ├── ...
├── imagesTs
│   ├── BRATS_485_0000.nii.gz
│   ├── BRATS_485_0001.nii.gz
│   ├── BRATS_485_0002.nii.gz
│   ├── BRATS_485_0003.nii.gz
│   ├── BRATS_486_0000.nii.gz
│   ├── BRATS_486_0001.nii.gz
│   ├── BRATS_486_0002.nii.gz
│   ├── BRATS_486_0003.nii.gz
│   ├── ...
└── labelsTr
    ├── BRATS_001.nii.gz
    ├── BRATS_002.nii.gz
    ├── ...
```

Example of data arrangement

```
nnUNet_raw/Dataset002_Heart/
├── dataset.json
├── imagesTr
│   ├── la_003_0000.nii.gz
│   ├── la_004_0000.nii.gz
│   ├── ...
├── imagesTs
│   ├── la_001_0000.nii.gz
│   ├── la_002_0000.nii.gz
│   ├── ...
└── labelsTr
    ├── la_003.nii.gz
    ├── la_004.nii.gz
    ├── ...
```


```json
{ 
 "channel_names": {  # formerly modalities
   "0": "T2", 
   "1": "ADC"
 }, 
 "labels": {  # THIS IS DIFFERENT NOW!
   "background": 0,
   "PZ": 1,
   "TZ": 2
 }, 
 "numTraining": 32, 
 "file_ending": ".nii.gz"
 "overwrite_image_reader_writer": "SimpleITKIO"  # optional! If not provided nnU-Net will automatically determine the ReaderWriter
 }
 ```
 The channel_names determine the normalization used by nnU-Net. If a channel is marked as 'CT', then a global normalization based on the intensities in the foreground pixels will be used. If it is something else, per-channel z-scoring will be used. Refer to the methods section in our paper for more details. nnU-Net v2 introduces a few more normalization schemes to choose from and allows you to define your own, see here for more information.

---
# Function to display CT and contours

In [None]:
PAT_ID = 14
DATA_DIRECTORY = "/home/ivazquez/Repos/nnUNet/raw_data/neck_cta/anonymized"
WINDOW_WIDTH = 400 # HU
WINDOW_CENTER = 40 # HU
INITIAL_SLICE = None
COLORMAP = 'gray'

dt = DicomToolbox(patient_data_directory = DATA_DIRECTORY)
dt.expected_data = ['ct', 'rtstruct']
dt.uniform_slice_thickness = False
dt.parse_dicom_files(PAT_ID, mask_resolution = 'ct', mask_names_only=False)

ct = dt.ct.data # grab the ct data
orig_coords = copy.deepcopy(dt.ct.coordinates) # grab the coordinates
# ct, dt.ct.coordinates = resize_to_user_resolution(ct, dt.ct.coordinates, [1,1,1]) 

ct = apply_windowing(ct, WINDOW_WIDTH, WINDOW_CENTER) # apply windowing
structures = [c for c in dt.contours]

# Create a Select widget
select_structures = Select(
    options=structures,
    value=structures[0],
    description='Structures:',
    rows=len(structures))

display(HBox([select_structures]))

In [None]:
import tkinter as tk
from tkinter import ttk, Listbox, Scrollbar
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import copy
from dicom_tools import DicomToolbox  # Ensure this module is available

class DicomViewer:
    def __init__(self, root, data_directory, pat_id, window_width, window_center):
        self.root = root
        self.data_directory = data_directory
        self.pat_id = pat_id
        self.window_width = window_width
        self.window_center = window_center

        self.dt = DicomToolbox(patient_data_directory=data_directory)
        self.dt.expected_data = ['ct', 'rtstruct']
        self.dt.uniform_slice_thickness = False
        self.dt.parse_dicom_files(pat_id, mask_resolution='ct', mask_names_only=False)

        self.ct = self.dt.ct.data
        self.orig_coords = copy.deepcopy(self.dt.ct.coordinates)
        self.ct = self.apply_windowing(self.ct, window_width, window_center)
        self.structures = [c for c in self.dt.contours]

        self.create_widgets()
        self.update_plot()

    def create_widgets(self):
        self.structure_listbox = Listbox(self.root, selectmode=tk.SINGLE)
        for struct in self.structures:
            self.structure_listbox.insert(tk.END, struct)
        self.structure_listbox.bind('<<ListboxSelect>>', lambda event: self.update_plot())
        self.structure_listbox.pack(side=tk.LEFT, fill=tk.Y)

        scrollbar = Scrollbar(self.root)
        scrollbar.pack(side=tk.LEFT, fill=tk.Y)
        self.structure_listbox.config(yscrollcommand=scrollbar.set)
        scrollbar.config(command=self.structure_listbox.yview)

        self.figure, self.ax = plt.subplots(figsize=(7, 7))
        self.canvas = FigureCanvasTkAgg(self.figure, master=self.root)
        self.canvas.get_tk_widget().pack()

        self.slice_var = tk.IntVar(value=self.ct.shape[0] // 2)
        self.slice_slider = ttk.Scale(self.root, from_=0, to=self.ct.shape[0] - 1, orient="horizontal", variable=self.slice_var)
        self.slice_slider.pack(fill=tk.X)
        self.slice_slider.bind("<Motion>", lambda event: self.update_plot())

        self.root.bind('<Left>', self.previous_slice)
        self.root.bind('<Right>', self.next_slice)

    def apply_windowing(self, image, window_width, window_center):
        min_hu = window_center - window_width // 2
        max_hu = window_center + window_width // 2
        image = np.clip(image, min_hu, max_hu)
        image = (image - min_hu) / (max_hu - min_hu) * 255.0
        return image

    def update_plot(self, event=None):
        slice_idx = self.slice_var.get()
        selected_idx = self.structure_listbox.curselection()
        if selected_idx:
            structure_name = self.structure_listbox.get(selected_idx)
            structure = self.dt.contours[structure_name].data

            self.ax.clear()
            ct_extent = [self.dt.ct.coordinates.x.min(), self.dt.ct.coordinates.x.max(),
                         self.dt.ct.coordinates.y.max(), self.dt.ct.coordinates.y.min()]
            self.ax.imshow(self.ct[slice_idx], cmap='gray', interpolation='none', extent=ct_extent)
            x, y = self.dt.ct.coordinates.x, self.dt.ct.coordinates.y
            self.ax.contour(x, y, structure[slice_idx], levels=[0.5], colors='red')
            self.ax.axis('off')

            self.canvas.draw()

    def previous_slice(self, event):
        current_slice = self.slice_var.get()
        if current_slice > 0:
            self.slice_var.set(current_slice - 1)
            self.update_plot()

    def next_slice(self, event):
        current_slice = self.slice_var.get()
        if current_slice < self.ct.shape[0] - 1:
            self.slice_var.set(current_slice + 1)
            self.update_plot()

if __name__ == "__main__":
    root = tk.Tk()
    root.title("DICOM Viewer")

    data_directory = "/home/ivazquez/Documents/REPOS/nnUNet/raw_data/neck_cta/anonymized"
    pat_id = 1
    window_width = 400
    window_center = 40

    viewer = DicomViewer(root, data_directory, pat_id, window_width, window_center)
    root.mainloop()
