In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import sys


def add_to_syspath(path: str):
    for p in sys.path:
        if p == path:
            return
    sys.path.append(path)


# slurm tools
add_to_syspath('/disk/soft/bia_software/slurm_tools')
#
add_to_syspath('/home/matthias/jupyter')
add_to_syspath('/home/matthias/jupyter/bia')
add_to_syspath('/home/matthias/python')

In [None]:
import os
import glob


In [None]:
from bia.tools import visualization
import imageio
from batch_tool import batchjob, batchjob_helper



In [None]:

def write_out_file(out_file, data, affine):
    out_path, _ = os.path.split(out_file)
    if os.path.exists(out_path):
        print('path exists')
        #shutil.rmtree(img_path)
    else:
        print('create path: ', out_path)
        os.makedirs(out_path)

    header = {
        "cal_min": 0,
        "cal_max": 1
    }

    imageio.write_nifti(out_file,
                        data, affine=affine, header=header,
                        overwrite=True)

def read_in_file(filename):
    img, affine, header = imageio.read_nifti(filename)
    return img, affine, header


def postprocess(img):
    pass


In [None]:

from random import randint
from time import sleep


param = {}


@batchjob.suppress_output
def process_file(in_file, out_file, param, lock):
    with lock:
        print('process file (PID={0}/BID={1}): {2}'.format(os.getpid(), param['batch_id'], 'lock aquired'))
    print('process file (PID={0}/BID={1}): {2}'.format(os.getpid(), param['batch_id'], in_file))

    # img, affine, header = read_in_file(in_file)
    # out_img_post = postprocess(img)
    # write_out_file(out_file, out_img_post, affine)

    # test exceptions
    if randint(0,1)%2 == 0:
        raise Exception('test: batch failed')
    #sleep(randint(0,3))
    return 'process file (PID={0}/BID={1}): {2}'.format(os.getpid(), param['batch_id'], randint(0, 3))


In [None]:

# nifti converted
data_folder = '/disk/matthias/hokto/hokto_data/DATA'
pattern = "**/*.nii.gz"
pattern = "**/out/*_model3D.nii.gz"

out_folder = data_folder

extension = '.nii.gz'
out_extension = '.nii'
fstub = '_post'
subpath = ''
#subpath = 'out'


in_files, out_files = batchjob_helper.get_filenames(
    data_folder, pattern, out_folder, extension, fstub, subpath, out_extension)

batchjob_helper.print_first(in_files, out_files)


In [None]:
bj = batchjob()
param['verbose'] = False
@batchjob.timethis
def run():
    bj.process_files(process_file, in_files, out_files, param)
run()

In [None]:

bj.print_result()


In [None]:

from functools import reduce

# example: caluclates overall length of string output
def process_result(results: list):
    return reduce(lambda x,y: x+len(y) if y != None else x, results, 0)


r1 = bj.process_result(fn=process_result)

r2 = process_result(bj.get_result())

print(r1, r2)

In [None]:

def group_batches_by_folder(infiles, outfiles):
    group_in_folders = [[infiles, outfiles]
                        for i, infiles, outfiles in batchjob_helper.get_batch_per_folder(infiles, outfiles)]
    return group_in_folders


In [None]:
param['verbose'] = True

process_file_batch = batchjob_helper.debug_process_file_batch

# group to batch with multiple files
bj.process_files(process_file_batch, in_files, out_files, param,
                 max_workers=4, group_batches=group_batches_by_folder)
# no grouping batch in -> out
bj.process_files(process_file_batch, in_files, out_files, param, max_workers=4)


In [None]:
import multiprocessing

def all_pids(value):
    sleep(1)
    return os.getpid()


@batchjob.suppress_output
def process_file_with_parallel(in_file, out_file, param, lock):

    pool_obj = multiprocessing.Pool()
    # get pids from pool
    answer = pool_obj.map(all_pids, range(0, 5))
    print(answer)

    # use shared memory
    d_shared: dict = param['d_shared']
    # use lock to access shared memory
    with lock:
        if d_shared.get('cnt') == None:
            d_shared['cnt'] = 1
        else:
            d_shared['cnt'] += 1


    return [os.getpid(), d_shared['cnt'], answer]

param['verbose'] = False
param['cnt'] = 0

bj.process_files(process_file_with_parallel, in_files,
                 out_files, param, max_workers=32)

bj.print_result()

print(param['cnt'])

bj.print_rusage()
