## Asynchronous parallel processing of FVS keyfiles

In [1]:
import os
import glob
import subprocess
import ipyparallel as ipp
import shutil
from tqdm import tqdm_notebook
import time

A function to execute FVS that will be mapped to all keyfiles.

In [2]:
def run_fvs(keyfile):
    fvs_exe = 'C:\\FVSbin\\'+os.path.split(keyfile)[-1][:5]+'.exe'
    subprocess.call([fvs_exe, '--keywordfile='+keyfile]) # run fvs
    
    base_dir = os.path.split(keyfile)[0]
    base_name = os.path.split(keyfile)[-1].split('.')[0]
    
    # clean-up the outputs
    # move the .out and .key file
#     path = os.path.join(base_dir, 'organon_tests_completed','keyfiles')
#     if not os.path.exists(path): 
#         os.makedirs(path)
#     shutil.move(keyfile, os.path.join(base_dir,'organon_tests_completed','keyfiles'))
#     path = os.path.join(base_dir, 'organon_tests_completed','outfiles')
#     if not os.path.exists(path):
#         os.makedirs(path)
#     shutil.move(os.path.join(base_dir,base_name+'.out'), os.path.join(base_dir,'organon_tests_completed','outfiles'))
    
    # delete the other files
    os.remove(os.path.join(base_dir, base_name+'.trl'))
    return keyfile

Run the following command in a command prompt to start up a cluster of workers:

`>> activate my_env # or other environment name`

`(my_env)>> ipcluster start -n 4 # or other number of cores`

In [3]:
# create a hub to control the workers
c = ipp.Client()
c.ids

[0, 1, 2, 3]

In [4]:
# if you want to run a single keyfile, use this
# subprocess.call(['C:\\FVSbin\\FVSop.exe', '--keywordfile=C:\\GitHub\\FSC_Case_Studies\\keyfiles_to_run\\fvsOP_stand12_rx1_off0.key'])

# as another example, for serial (not parallel) processing of some keyfiles without cleaning up output files:
# for keyfile in to_run:
#     run_fvs(keyfile)
    #subprocess.call(['C:\\FVSbin\\FVSpn.exe', '--keywordfile='+keyfile])

Create a direct view of the workers and a load-balanced view for submitting jobs

In [4]:
dv = c[:] # direct view
v = c.load_balanced_view() # load-balanced view

# import packages to all workers
with dv.sync_imports():
    import subprocess
    import shutil
    import os

importing subprocess on engine(s)
importing shutil on engine(s)
importing os on engine(s)


Execute an ayschronous batch of FVS runs for all the keyfiles

In [17]:
# gather the list of keyfiles to run
run_dir = os.path.abspath('keyfiles_to_run')
to_run = glob.glob(os.path.join(run_dir, '*.key'))
print('{:,}'.format(len(to_run)), 'keyfiles found.')

12 keyfiles found.


In [None]:
to_run = to_run * 10

In [18]:
# start asynchronous batch with load-balanced view
res = v.map_async(run_fvs, to_run)
print('Started batch processing.')

runs_done = res.progress
with tqdm_notebook(total=len(res), initial=runs_done, desc='FVS Run Progress', unit='keyfile') as pbar:
    while not res.ready():
        new_progress = res.progress - runs_done
        runs_done += new_progress
        pbar.update(new_progress)
        time.sleep(0.5)

Started batch processing.


HBox(children=(IntProgress(value=0, description='FVS Run Progress', max=12), HTML(value='')))






In [None]:
def timefmt(seconds):
    m, s = divmod(seconds, 60)
    h, m = divmod(m, 60)
    return '{:.0f} hours, {:.0f} minutes, {:.1f} seconds'.format(h,m,s)

In [None]:
print('Human time spent:', timefmt(res.wall_time))
print('Computer time spent:', timefmt(res.serial_time))
print('Async speedup:', '{:.2f}x'.format(res.serial_time/res.wall_time))
print('Human time per FVS run:', '{:.2f} seconds'.format(res.wall_time/res.progress))
print('Computer time per FVS run:', '{:.2f} seconds'.format(res.serial_time/res.progress))

In [None]:
# Return a true/false if full set of jobs completed
# res.ready()

# Cancels the batch (wait for fvs executables to complete)
res.abort()

In [None]:
# shut down the parallel workers
# c.shutdown(hub=True)