In [1]:
import datetime
import json
import os
import time

from base64 import b64decode

from IPython import display
from tqdm.auto import tqdm
import nbclient.exceptions

import papermill as pm

In [2]:
task_prefix = "papermill_tomopy_{}"

# write results to $SCRATCH
curdir = os.getcwd()
scratchdir = os.path.expandvars("$SCRATCH")
sessiondir = os.path.join(scratchdir, task_prefix.format(datetime.datetime.now().isoformat()))

# create the output directory as needed
if not os.path.exists(sessiondir):
    os.mkdir(sessiondir)

# copy our input data directory to $SCRATCH for execution
src = '/global/cfs/cdirs/als/users/parkinson/SLS_Feb2019/disk1/RTV_18A_air_760torr_08_fast'
inputdir = os.path.join(scratchdir, "RTV_18A_air_760torr_08_fast")

# create the input directory on $SCRATCH as needed and copy the data
if not os.path.exists(inputdir):
    shutil.copytree(src, inputdir)

# set the parameters we want to pass to the notebook
params = dict(
    filename = "RTV_18A_air_760torr_08_fast.h5",
    inputPath = inputdir,
    outputPath = sessiondir,
    chunk_proj=1,
    chunk_sino=1,
    ncore=1,
    filetype='sls'
)

# this is the notebook we will be running through papermill, which calls the reconstruction code
in_nb = os.path.join(curdir, 'tomopy_recon_template.ipynb')

In [3]:
# connect to our Dask cluster running inside of a Cori job, or a local Dask cluster if no job
from dask.distributed import Client, as_completed

localcluster = False
if os.path.exists("dask_client"):
    try:
        with open("dask_client", 'r') as f:
            scheduler_file = f.read().strip()
        
        dask_client = Client(scheduler_file=scheduler_file)
    except Exception as e:
        print("Unable to use existing dask_client file to connect to a Dask cluster!")
        localcluster = True
else:
    localcluster = True

if localcluster:
    # No Dask cluster present, we will start a local cluster
    from dask.distributed import LocalCluster    
    
    cluster = LocalCluster(n_workers=4, threads_per_worker=2)
    dask_client = Client()
    
dask_client

0,1
Client  Scheduler: tcp://10.128.0.25:8786  Dashboard: /user/mhenders/cori-shared-node-cpu/proxy/8787/status,Cluster  Workers: 32  Cores: 64  Memory: 0 B


In [4]:
# process all chunks
#timepoints range from 0 to 105
first_timepoint = 0
last_timepoint = 10
submits = []
fails = []

# save the input parameters out to a file with the output notebooks and data
input_params_file = os.path.join(sessiondir, task_prefix.format("input_parameters.json"))
with open(input_params_file, 'w') as f:
    json.dump(params, f, indent=4)

# submit all papermill tasks to Dask
num_points = last_timepoint - first_timepoint
with tqdm(total=num_points, desc="Tasks submitted", unit="task") as submits_pbar:
    for timepoint in range(first_timepoint, last_timepoint):
        params['timepoint'] = timepoint
        out_nb = os.path.join(sessiondir, '{}.ipynb'.format(task_prefix.format(timepoint)))

        submits.append(
            dask_client.submit(
                pm.execute_notebook, 
                in_nb, 
                out_nb, 
                params,
                start_timeout=60,
                progress_bar=False))
        submits_pbar.update(1)
        time.sleep(0.25)

print("{}: {} tasks submitted, {} timepoints".format(params['filename'], len(submits), num_points))

last_exc = None
completed = []
failed = []
# wait for all the tasks to complete
with tqdm(total=len(submits), desc="Tasks completed", unit="task") as completed_pbar:
    for future in as_completed(submits):
        try:
            x = future.result()
            completed.append(x)
        except nbclient.exceptions.DeadKernelError as e:
            timepoint = submits.index(future)
            params['timepoint'] = timepoint
            failed.append(params)
        except Exception as e:
            print(e)
            timepoint = submits.index(future)
            params['timepoint'] = timepoint
            failed.append(params)
        finally:
            completed_pbar.update(1)

HBox(children=(FloatProgress(value=0.0, description='Tasks submitted', max=10.0, style=ProgressStyle(descripti…


RTV_18A_air_760torr_08_fast.h5: 10 tasks submitted, 10 timepoints


HBox(children=(FloatProgress(value=0.0, description='Tasks completed', max=10.0, style=ProgressStyle(descripti…




In [5]:
# cleanup any remaining tasks, if present

resubmits = []
while len(failed) > 0:
    display.clear_output(wait=True)
    
    with tqdm(total=num_points, desc="Tasks resubmitted", unit="task") as resubmits_pbar:
        for task_params in failed:
            out_nb = os.path.join(sessiondir, '{}.ipynb'.format(task_prefix.format(task_params['timepoint'])))

            resubmits.append(
                dask_client.submit(pm.execute_notebook, in_nb, out_nb, task_params, progress_bar=False))
            resubmits_pbar.update(1)

    with tqdm(total=len(resubmits), desc="Cleanup Tasks completed", unit="task") as cleanup_pbar:
        for future in as_completed(resubmits):
            try:
                x = future.result()
                
                # clear the failure
                i = resubmits.index(future)
                failed[i] = None
                resubmits[i] = None
                completed.append(x)
            except RuntimeError as e:
                last_exc = e    
            finally:
                cleanup_pbar.update(1)
    
    failed = [f for f in failed if f is not None]
    resubmits = [r for r in resubmits if r is not None]

In [6]:
# select the directory to preview (currently only one directory for data)
from ipypathchooser import PathChooser
slices_path = PathChooser(default_directory=sessiondir)
slices_path.chosen_path = os.path.join(sessiondir, "rec", os.path.splitext(params['filename'])[0])

In [7]:
#import io
#from PIL import Image
#import numpy as np

# preview the slice outputs for the current set of params
from IPython import display
from ipysliceviewer import SliceViewer

#imagestack = []
#for x in completed:
#    imagestack.append(
#        np.array(
#            Image.open(
#                io.BytesIO(
#                    b64decode(x['cells'][-3]['outputs'][1]['data']['image/png'])))))
#np_imagestack = np.array(imagestack)
#s = SliceViewer(volume=np_imagestack)

s = SliceViewer(default_directory=slices_path.chosen_path)
display.display(s)

SliceViewer(children=(PathChooser(current_directory='/global/cscratch1/sd/mhenders/papermill_tomopy_2020-11-03…

In [8]:
if localcluster:    
    dask_client.shutdown()
    dask_client.close()
else:
    dask_client.close()