In [27]:
import SimpleITK as sitk
import nibabel as nib
import os
import time
import numpy as np
import sys
import warnings
import multiprocessing as mp
import ctypes
from vprof import runner

warnings.filterwarnings('ignore')
sys.path.append("/notebooks/DeepNeuroAN/")
from deepneuroan.data_generator import DataGenerator

In [7]:
data_dir = "/scratch/ltetrel/neuromod/derivatives/deepneuroan/training/generated_data/"
template_filepath = os.path.join(data_dir, "template_on_grid")

list_files = []
list_files_tmp = set([])
for root, _, files in os.walk(data_dir):
    for file in files:
        filepath = os.path.join(root, file).split('.')[0]
        if os.path.exists(filepath + ".txt"):
            list_files_tmp.add(filepath)
list_files = list(list_files_tmp)

bs = 128
r = 1
params_gen = dict(list_files=list_files, template_file=template_filepath, batch_size=bs, seed=0)

train_gen = DataGenerator(partition="train", **params_gen)
valid_gen = DataGenerator(partition="valid", **params_gen)
test_gen = DataGenerator(partition="test", **params_gen)

NameError: name 'os' is not defined

In [None]:
def init_worker(s_imgs_):
    # The shared array pointer is a global variable so that it can be accessed by the
    # child processes. It is a tuple (pointer, dtype, shape).
    global s_imgs
    s_imgs = s_imgs_  
    
    
def shared_to_numpy(shared_arr, shape, npdtype):
    """Get a NumPy array from a shared memory buffer, with a given dtype and shape.
    No copy is involved, the array reflects the underlying shared buffer."""
    sz = int(np.product(shape))
    dtype = np.dtype(npdtype)
    return np.frombuffer(shared_arr, dtype=dtype, count=sz).reshape(shape)

def create_shared_array(shape, npdtype):
    """Create a new shared array. Return a tuple of (shared array pointer, shape, npdtype), and a NumPy array view to it.
    Note that the buffer values are not initialized.
    """
    # Get a ctype type from the NumPy dtype.
    cdtype = np.ctypeslib.as_ctypes_type(npdtype)
    # Create the RawArray instance.
    shared_arr = mp.RawArray(cdtype, int(np.prod(shape)))
    # Get a NumPy array view.
    arr = shared_to_numpy(shared_arr, shape, npdtype)
    return (shared_arr, shape, npdtype,), arr

def compute_mult(i, imgs):
    img = sitk.GetArrayFromImage(sitk.ReadImage(list_files[i], sitk.sitkFloat32))
    imgs[i,] = img    

def compute_pool_s(i):
    imgs = shared_to_numpy(*s_imgs)
    img = sitk.GetArrayFromImage(sitk.ReadImage(list_files[i], sitk.sitkFloat32))
    imgs[i,] = img    
    
def compute_process_s(i, s_imgs):
    imgs = shared_to_numpy(*s_imgs)
    img = sitk.GetArrayFromImage(sitk.ReadImage(list_files[i], sitk.sitkFloat32))
    imgs[i,] = img
    
def compute_process_q(queue, i):
    img = sitk.GetArrayFromImage(sitk.ReadImage(list_files[i], sitk.sitkFloat32))
    queue.put(img)

In [None]:
def profile_serial():
    tic = time.time()
    for _ in range(r):
        print("starting serial")
        imgs = np.zeros((bs, 220, 220, 220), dtype=np.float32)
        
        for i in range(bs):
            compute_mult(i, imgs)
        print(np.mean(imgs))
        print("finishing serial")
        
    ElpsTime = time.time() - tic
    print("*** Total %1.3f s ***"%(ElpsTime))
    print("%1.4f s per sample"%(ElpsTime/(r*bs)))

# https://gist.github.com/rossant/7a46c18601a2577ac527f958dd4e452f   
# https://stackoverflow.com/questions/7894791/use-numpy-array-in-shared-memory-for-multiprocessing
# https://calcul.math.cnrs.fr/attachments/spip/Documents/Ecoles/2013/python/Multiprocessing.pdf
def profile_pool_s():
    tic = time.time()
    for _ in range(r):
        print("starting shared mp.Pool()")
        shape = (bs, 220, 220, 220,)
        npdtype = np.float32
        s_imgs, imgs = create_shared_array(shape, npdtype)
        print(np.mean(imgs))
        pool = mp.Pool(mp.cpu_count(), initializer=init_worker, initargs=(s_imgs,))
        
        for i in range(bs):
            pool.apply_async(compute_pool_s, args=(i,))
        pool.close()
        pool.join()
        print(np.mean(imgs))
        print("finishing shared mp.Pool()")
        
    ElpsTime = time.time() - tic
    print("*** Total %1.3f s ***"%(ElpsTime))
    print("%1.4f s per sample"%(ElpsTime/(r*bs)))
    
def profile_process_s():

# Needs to try shared memory with mp.Process()
# https://gist.github.com/fginter/c6206f244d164bd9d4df

    tic = time.time()
    for _ in range(r):
        print("starting mp.Process()")
        shape = (bs, 220, 220, 220,)
        npdtype = np.float32
        s_imgs, imgs = create_shared_array(shape, npdtype)
        print(np.mean(imgs))
        processes = []
        lim_cores=mp.cpu_count()
        
        for i in range(bs//lim_cores)
            for j in range(lim_cores):
                process = mp.Process(target=compute_process_s, args=(i, s_imgs,))
                processes.append(process)
                process.start()

            for process in processes:
                process.join()
            
        print(np.mean(imgs))
        print("finishing mp.Process()")
    
    ElpsTime = time.time() - tic
    print("*** Total %1.3f s ***"%(ElpsTime))
    print("%1.4f s per sample"%(ElpsTime/(r*bs)))
    
def profile_process_queue():
    tic = time.time()
    for _ in range(r):
        print("starting queued mp.Process()")
        imgs = np.zeros((bs, 220, 220, 220), dtype=np.float32)
        queue = mp.Queue()
        processes = [mp.Process(target=compute_process_q, args=(queue, i,)) for i in range(bs)]
        print(np.mean(imgs))
        
        for process in processes:
            process.start()
        
        for sample in range(bs):
            imgs[sample,] = queue.get()
        
        for process in processes:
            process.join()
        print(np.mean(imgs))
        print("finishing queued mp.Process()")
    
    ElpsTime = time.time() - tic
    print("*** Total %1.3f s ***"%(ElpsTime))   
    print("%1.4f s per sample"%(ElpsTime/(r*bs)))

In [28]:
mp.cpu_count()

80

In [None]:
runner.run(profile_process_s, 'cmhp', host='localhost', port=8000)
# profile_serial()
# profile_process_queue()
# profile_pool_s()
# profile_process_s()


In [None]:
r = 10
bs = 128
data_dir = "/notebooks/neuromod/derivatives/deepneuroan/training/generated_data"
template_filepath = os.path.join(data_dir, "template_on_grid")

list_files = []
list_files_tmp = set([])
for root, _, files in os.walk(data_dir):
    for file in files:
        filepath = os.path.join(root, file).split('.')[0]
        if os.path.exists(filepath + ".txt"):
            list_files_tmp.add(filepath)
list_files = list(list_files_tmp)
list_files = list_files[:r]

def normalize_img(img):
    return (img - np.mean(img)) / np.std(img)

big_img = np.zeros((bs, 220, 220, 220))

# simple itk
tic = time.time()
for idx in range(r):
    print(idx)
    for i, file in enumerate(list_files):
        img = sitk.GetArrayFromImage(sitk.ReadImage(file + ".nii.gz", sitk.sitkFloat32))
        big_img[i,] = normalize_img(img)
ElpsTime = time.time() - tic
print("*** Total sitk %1.2f s ***"%(ElpsTime))
print("%1.4f s per sample"%(ElpsTime/(r*bs)))

# nibabel
tic = time.time()
for idx in range(r):
    print(idx)
    for i, file in enumerate(list_files):
        img = nib.load(file + ".nii.gz").get_data()
        big_img[i,] = normalize_img(img)
ElpsTime = time.time() - tic
print("*** Total nibabel %1.2f s ***"%(ElpsTime))
print("%1.4f s per sample"%(ElpsTime/(r*bs)))

In [None]:
def profile_data(train_gen, bs):
    tic = time.time()
    for idx in range(r):
        print(idx)
        batch = train_gen.__getitem__(idx)
    ElpsTime = time.time() - tic
    print("*** Total %1.3f s ***"%(ElpsTime))
    print("%1.4f s per sample"%(ElpsTime/(r*bs)))