# DICOM to NifTI (Local to Remote)

In [1]:
#|default_exp dicom_to_nifti_remote

We can upload these DICOM files as NifTI files to a remote machine with a good GPU, for us to make deep learning experiments more efficiently.

Follow these steps to rent an instance to run this notebook:
1. Create a [Vastai](https://cloud.vast.ai) instance. I chose one with a 1x RTX 3090 24GB VRAM, enough storage for the whole dataset + some extra, and high upload speed (in the closest location possible).
2. Open a Jupyter terminal and create a directory for the dataset using `mkdir /workspace/data; mkdir /workspace/data/series`.
3. Get the instance's public IP and exposed SSH port and set the `RSNA_IAD_VASTAI_PUBLIC_IP` and `RSNA_IAD_VASTAI_SSH_PORT` variables in the `develop` script of this project's directory.
4. Create an SSH key pair using `ssh-keygen -t rsa -b 4096 -C "some-email"`, press enter on everything and add the public key in `$HOME/.ssh/id_rsa.pub` to the instance.
5. Rerun `source develop` and test copying one of the stored NifTI files from the previous notebook using `rsync -arz -v --progress -e "ssh -i ~/.ssh/id_rsa -p $RSNA_IAD_VASTAI_SSH_PORT" "path-to-nifti-file" root@$RSNA_IAD_VASTAI_PUBLIC_IP:/workspace/data/series` in the terminal. This will also ask you to confirm the connection to an unknown host for the first time, so it works automatically in the future.
6. Stop the Jupyter server, run `source develop`, and run the server again.

In [2]:
import sys
sys.path.append("../lib")

Let's load somes files:

In [3]:
#|export
from dicom_to_nifti import *
import os

In [4]:
base_path_dicom = os.environ["RSNA_IAD_DATA_DIR"]
series_path_dicom = f"{base_path_dicom}/series"
series_uid_l = os.listdir(series_path_dicom)
ds_l = dicom_serie_load(series_path_dicom, series_uid_l[0])
volume, ds_metadata_l = dicom_serie_process(ds_l)

base_path_nifti = f"{os.environ['RSNA_IAD_DATA_DIR']}/nifti"
series_path_nifti = f"{base_path_dicom}/series"
dicom_volume_to_nifti(volume, ds_metadata_l, series_uid_l[0], series_path_nifti)

We can now take the volume and metadata, store them as a NifTI temporarily on the local machine, and copy them to the remote instance:

In [5]:
#|export
import torch
import numpy as np
import nibabel

def dicom_volume_to_nifti_remote(volume, ds_metadata_l, serie_uid, path, send_file, remote_path):

    nifti_path = f"{path}/{serie_uid}.nii.gz"
    
    nifti = nibabel.nifti1.Nifti1Image(volume.detach().cpu().numpy(), affine=np.eye(4))
    
    # Define spacing
    spacing = dicom_serie_get_spacing(ds_metadata_l)
    nifti.header.set_zooms(spacing)
    
    nibabel.save(nifti, nifti_path)

    # copy to remote machine
    output = send_file(nifti_path, remote_path)
    if output.stderr != b"":
        print(output.stderr.decode("UTF-8"))

    os.remove(nifti_path)

We do need to provide a `send_file` callback to the previous function, which takes the `remote_path` as an argument. We will create a class for this remote GPU provider where the public IP and SSH port are defined at initialization and then used as needed:

In [10]:
#|export
import subprocess

class VastAI:

    def __init__(self, public_ip, ssh_port):
        self.public_ip = public_ip
        self.ssh_port = ssh_port

    def set_api_key(self, api_key):
        output = subprocess.run(["vastai", "set", "api-key", api_key], capture_output=True)
        return output

    def send_file(self, local_path, remote_path):
        output = subprocess.run(["rsync", "-arz", "-v", "-y",
                f"-e ssh -i {os.environ["HOME"]}/.ssh/id_rsa -p {self.ssh_port}", 
                local_path, 
                f'root@{self.public_ip}:/workspace/{remote_path}'], capture_output=True)

        if output.stderr == b"Welcome to vast.ai. If authentication fails, try again after a few seconds, and double check your ssh key.\nHave fun!\n":
            output.stderr = b""
        
        return output

In [12]:
vastai = VastAI(os.environ["RSNA_IAD_VASTAI_PUBLIC_IP"], 
                os.environ["RSNA_IAD_VASTAI_SSH_PORT"])

vastai.send_file(f"{series_path_nifti}/{series_uid_l[0]}.nii.gz", "data/series")

CompletedProcess(args=['rsync', '-arz', '-v', '-y', '-e ssh -i /home/afonsomm/.ssh/id_rsa -p 11757', '/home/afonsomm/Desktop/data/rsna_iad/series/1.2.826.0.1.3680043.8.498.10004044428023505108375152878107656647.nii.gz', 'root@79.160.189.79:/workspace/data/series'], returncode=0, stdout=b'sending incremental file list\n1.2.826.0.1.3680043.8.498.10004044428023505108375152878107656647.nii.gz\n\nsent 175 bytes  received 49,252 bytes  19,770.80 bytes/sec\ntotal size is 49,325,902  speedup is 997.95\n', stderr=b'')

Finally, we can create the temporary directory where the intermediate NifTIs will live until they are sent to the remote instance, and then call the actual function passing in the VastAI object `send_file` method as the callback:

In [14]:
# Set temporary directory
series_path_nifti_temp = f"{base_path_nifti}/temp"
if not os.path.exists(series_path_nifti_temp):
    os.mkdir(series_path_nifti_temp)

dicom_volume_to_nifti_remote(volume, ds_metadata_l, series_uid_l[0], series_path_nifti_temp, vastai.send_file, "data/series")

os.rmdir(series_path_nifti_temp)

In [15]:
#|export
def dicom_serie_to_nifti_remote(base_path_dicom, serie_uid, base_path_nifti, send_file, remote_path):
    ds_l = dicom_serie_load(base_path_dicom, serie_uid)
    volume, ds_metadata_l = dicom_serie_process(ds_l)
    dicom_volume_to_nifti_remote(volume, ds_metadata_l, serie_uid, base_path_nifti, send_file, remote_path)

In [16]:
series_path_nifti_temp = f"{base_path_nifti}/temp"
if not os.path.exists(series_path_nifti_temp):
    os.mkdir(series_path_nifti_temp)

dicom_serie_to_nifti_remote(series_path_dicom, series_uid_l[0], series_path_nifti, vastai.send_file, "data/series")

os.rmdir(series_path_nifti_temp)

Use the same approach to parallelize file IO as in the previous notebook:

In [17]:
#|export
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing
from tqdm import tqdm

def dicom_series_to_niftis_remote(base_path_dicoms, series_uid, base_path_niftis, send_file, remote_path, max_workers):

    # Set temporary directory
    series_path_nifti_temp = f"{base_path_nifti}/temp"
    if not os.path.exists(series_path_nifti_temp):
        os.mkdir(series_path_nifti_temp)
    
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(dicom_serie_to_nifti_remote, base_path_dicoms, serie_uid, series_path_nifti_temp, send_file, remote_path) for serie_uid in series_uid]
        for future in tqdm(as_completed(futures), total=len(futures)):
            ...

    os.rmdir(series_path_nifti_temp)

In [18]:
import random

random.seed(0)
samples_series_uid_l = random.sample(series_uid_l, 5)
dicom_series_to_niftis_remote(series_path_dicom, samples_series_uid_l, series_path_nifti, vastai.send_file, "data/series", 4)

100%|█████████████████████████████████████████████████████████████| 5/5 [05:56<00:00, 71.21s/it]


In [19]:
# dicom_series_to_niftis_remote(series_path_dicom, series_uid_l, series_path_nifti, vastai.send_file, "data/series", 4)

Also send the csv with labels and demographics:

In [20]:
vastai.send_file(f"{os.environ['RSNA_IAD_DATA_DIR']}/train.csv", "data/")

CompletedProcess(args=['rsync', '-arz', '-v', '-y', '-e ssh -i /home/afonsomm/.ssh/id_rsa -p 11757', '/home/afonsomm/Desktop/data/rsna_iad/train.csv', 'root@79.160.189.79:/workspace/data/'], returncode=0, stdout=b'sending incremental file list\ntrain.csv\n\nsent 111,664 bytes  received 35 bytes  31,914.00 bytes/sec\ntotal size is 472,480  speedup is 4.23\n', stderr=b'')

In [21]:
from nbdev.export import nb_export

In [22]:
nb_export("2_dicom_to_nifti_remote.ipynb", "../lib")