
# DCDE 11/14/19 dry run

## Topics
  * prerequisites
  * recipes, templates
  * lessons learned
  * components we're using
  * Participants
  * How accounts are created
  * Live demo


## Demo steps  

  - Prepare for demo
    - Log out all the things
    - Activate Globus endpoints
  - Demo begins
    - Log in to jupyterhub via cilogon
    - Talk about single DCDE identity used across sites, how they're set up for it
    - Talk about 3 sites participating - DCDE set up w/ oauth_ssh, Globus, mix of Condor & Slurm
    - Load demo page
    - Introduce Relion - say we're using containers, singlarity at each site
    - Show data set at bnl - say we've got it staged to other prerequisites
    - Talk about parsl -- we're leveraging it to run across a distributed,  mixed environment
    - Run import(?) at BNL.  
    - Sync data out to ORNL for (motioncorr or ctffind) -- something quick
    - Sync data back to BNL via Globus
    - Sync data out to ANL
    - Run autopick 3d refine(?) at  anl
    - pull data back to bnl
    - Show pictures w/ nglview at BNL

*note: clean relion dataset is at gssh.lcrc.anl.gov:/home/dcowley/dcde-sc19-relion-data-clean.tgz.  There may be copies elsewhere...*

## list of steps/outputs:

| Target Site | job step | output type |  Output treatment | Approx. time|
| ----- | -----  | ----- | ----- | ---|
| ORNL |autopick | | | | |
| ORNL |extract | | | |
| ? | ctffind |  | | |
| ? | autopick |  | | |
| ANL | 3d refine? - view w/ NGL |   | | | 

## Globus endpoints and data dirs


|  name |   Other  name   | UUID  | Directory |
|  --- | ---   | ---  | --- |
| ORNL DTN | `CADES-OR` |  `57230a10-7ba2-11e7-8c3b-22000b9923ef` | `/nfs/data/dcde-store/scratch/sc19-demo`  |
| EMSL DTN | `mscdtn.emsl.pnl.gov` |  `e133a52e-6d04-11e5-ba46-22000b92c6ec` | `/dtemp/mscfops/d3c724/sc19-demo` |
| BNL DTN | `globus02.sdcc.bnl.gov` |   `23f78cc8-41e0-11e9-a618-0a54e005f950` | `/hpcgpfs01/scratch/dcde1000006/sc19-data` |
| LCRC DTN | `bmgt3.lcrc.anl.gov` | `57b72e31-9f22-11e8-96e1-0a6d4e044368` |  `/blues/gpfs/home/dcowley/sc19-demo` |



###  Investigate (see sc19-screenply-cruft.md):

  * Did I do motioncor on Cascade w/ GPGPUs? I think I did.  
    * Is unblur in the singularity container?
  * Will relion_display work in any way?
  * Can we have some canned pictures?  Capture shots from X display or Chimera or something


In [None]:
import sys
print(sys.path)

**DCDE Demo Site Configs:**

In [1]:


import parsl
import os
from parsl.config import Config


from parsl.channels import OAuthSSHChannel
from parsl.providers import CondorProvider
from parsl.providers import SlurmProvider
from parsl.launchers import SrunLauncher
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_hostname
from parsl.app.app import bash_app
from parsl.app.app import python_app



anl_config = Config(
    app_cache=True,
    checkpoint_files=None,
    checkpoint_mode=None,
    checkpoint_period=None,
    data_management_max_threads=10,
    executors=[HighThroughputExecutor(
        address='130.199.185.13',
        cores_per_worker=1.0,
        heartbeat_period=30,
        heartbeat_threshold=120,
        interchange_port_range=(50000, 51000),
        label='anl-slurm',
        launch_cmd='process_worker_pool.py {debug} {max_workers} -p {prefetch_capacity} -c {cores_per_worker} -m {mem_per_worker} --poll {poll_period} --task_url={task_url} --result_url={result_url} --logdir={logdir} --block_id={{block_id}} --hb_period={heartbeat_period} --hb_threshold={heartbeat_threshold} ',
        managed=True,
        max_workers=1,
        #mem_per_worker=None,
        poll_period=10,
        prefetch_capacity=0,
        interchange_address='10.70.128.9', #this is the address worker talk to inetrchange(head node)
        provider=SlurmProvider(
            'debug',
            channel=OAuthSSHChannel(
                'gssh.lcrc.anl.gov',
                envs={},
                port=2222,
                script_dir='/home/dcowley/anl-parsl-scripts',
                username='dcowley'
            ),
            cmd_timeout=10,
            exclusive=True,
            init_blocks=1,
            # launcher=SingleNodeLauncher(),
            max_blocks=1,
            min_blocks=1,
            move_files=True,
            nodes_per_block=1,
            parallelism=0.0,
            scheduler_options='#SBATCH -A dcde\n#SBATCH -t 0:20:00\n#SBATCH -N 1\n#SBATCH --ntasks-per-node=36\n#SBATCH -J relion-autopick\n#SBATCH -p bdwall\n#SBATCH -D /blues/gpfs/home/dcowley/sc19-demo\n#SBATCH -o relion-autopick.%j.out\n#SBATCH -e relion-autopick.%j.err',
            walltime='00:10:00',
            #worker_init='source /home/dcde1000001/dcdesetup.sh'
            worker_init='source /lcrc/project/DCDE/setup.sh;  source activate /lcrc/project/DCDE/envs/dcdeRX; export I_MPI_FABRICS=shm:tmi'
        ),
        storage_access=[],
        suppress_failure=False,
        worker_debug=True,
        worker_logdir_root='/home/dcowley/parsl_scripts/logs',
        worker_port_range=(50000, 51000),
        #worker_ports=None,
        working_dir='/home/dcowley/parsl_scripts'
    )],
    lazy_errors=True,
    monitoring=None,
    retries=0,
    run_dir='runinfo',
    strategy='simple',
    usage_tracking=False
)

bnl_config = Config(
    app_cache=True,
    checkpoint_files=None,
    checkpoint_mode=None,
    checkpoint_period=None,
    data_management_max_threads=10,
    executors=[HighThroughputExecutor(
        #address='127.0.0.1',
        address='130.199.185.13',
        cores_per_worker=1,
        heartbeat_period=30,
        heartbeat_threshold=120,
        interchange_port_range=(50000, 51000),
        label='bnl-condor',
        launch_cmd='process_worker_pool.py {debug} {max_workers} -p {prefetch_capacity} -c {cores_per_worker} -m {mem_per_worker} --poll {poll_period} --task_url={task_url} --result_url={result_url} --logdir={logdir} --block_id={{block_id}} --hb_period={heartbeat_period} --hb_threshold={heartbeat_threshold} ',
        mem_per_worker=4,
        managed=True,
        max_workers=1,
        poll_period=10,
        prefetch_capacity=0,
        interchange_address='130.199.185.9', #this is the address worker talk to inetrchange(head node)
        provider=CondorProvider(
            channel=OAuthSSHChannel(
                'spce01.sdcc.bnl.gov',
                envs={},
                port=2222,
                script_dir='/sdcc/u/dcde1000006/parsl_scripts',
                username='dcde1000006'
            ),
            environment={},
            init_blocks=1,
            # launcher=SingleNodeLauncher(),
            max_blocks=1,
            min_blocks=1,
            nodes_per_block=1,
            #parallelism=1,
            parallelism=0,
            project='',
            #Trying this Requirements directive per Dong's instructions:
            #requirements='regexp("^sp[oa]", machine)',
            scheduler_options='accounting_group = group_sdcc.main \nRequirements = (regexp("^sp[oa]", machine))',
            transfer_input_files=[],
            walltime='00:30:00',
            #worker_init='source /hpcgpfs01/work/dcde/setup.sh; source activate dcdemaster20191008'
            worker_init='source /hpcgpfs01/work/dcde/setup.sh; source activate dcdeRX'
        ),
        storage_access=[],
        suppress_failure=False,
        worker_debug=True,
        worker_logdir_root='/sdcc/u/dcde1000006/parsl_scripts/logs',
        worker_port_range=(50000, 51000),
        #worker_port_range=(5000, 5100),   # per John H's message 8/29/19
        worker_ports=None,
        working_dir='/sdcc/u/dcde1000006/parsl_scripts'
    )],
    lazy_errors=True,
    monitoring=None,
    retries=0,
    run_dir='runinfo',
    strategy='simple',
    usage_tracking=False
)

ornl_config = Config(
    app_cache=True,
    checkpoint_files=None,
    checkpoint_mode=None,
    checkpoint_period=None,
    data_management_max_threads=10,
    executors=[HighThroughputExecutor(
        address='130.199.185.13',
        cores_per_worker=1.0,
        heartbeat_period=30,
        heartbeat_threshold=120,
        interchange_port_range=(50000, 51000),
        label='ornl-slurm',
        launch_cmd='process_worker_pool.py {debug} {max_workers} -p {prefetch_capacity} -c {cores_per_worker} -m {mem_per_worker} --poll {poll_period} --task_url={task_url} --result_url={result_url} --logdir={logdir} --block_id={{block_id}} --hb_period={heartbeat_period} --hb_threshold={heartbeat_threshold} ',
        managed=True,
        max_workers=1,
        #mem_per_worker=None,
        poll_period=10,
        prefetch_capacity=0,
        interchange_address='128.219.185.39', #this is the address worker talk to interchange (head node)
        provider=SlurmProvider(
            'debug',
            channel=OAuthSSHChannel(
                'dcde-ext.ornl.gov',
                envs={},
                port=2222,
                #script_dir='/home/dcde1000006/ornl-parsl-scripts',
                script_dir='/nfs/scratch/dcde1000006/ornl-parsl-scripts',
                username='dcde1000006'
            ),
            cmd_timeout=10,
            exclusive=True,
            init_blocks=1,
            # launcher=SingleNodeLauncher(),
            max_blocks=1,
            min_blocks=1,
            move_files=True,
            nodes_per_block=1,
            parallelism=0.0,
            scheduler_options='#SBATCH -D /nfs/scratch/sc19-demo\n#SBATCH -o relion-autopick.%j.out\n#SBATCH -e relion-autopick.%j.err',
            walltime='00:10:00',
            worker_init='source /nfs/scratch/dcde1000012/RX.sh'
        ),
        storage_access=[],
        suppress_failure=False,
        worker_debug=True,
        worker_logdir_root='/nfs/scratch/dcde1000006/parsl_scripts/logs',
        worker_port_range=(50000, 51000),
        #worker_ports=None,
        working_dir='/nfs/scratch/dcde1000006/parsl_scripts'
    )],
    lazy_errors=True,
    monitoring=None,
    retries=0,
    run_dir='runinfo',
    strategy='simple',
    usage_tracking=False
)

ANL_EP = '57b72e31-9f22-11e8-96e1-0a6d4e044368'
BNL_EP = '23f78cc8-41e0-11e9-a618-0a54e005f950'
EMSL_EP = 'e133a52e-6d04-11e5-ba46-22000b92c6ec'
ORNL_EP = '57230a10-7ba2-11e7-8c3b-22000b9923ef'


**Set up Globus Auth**

In [2]:
import subprocess
import json
from globus_sdk import (NativeAppAuthClient, TransferClient,
                        RefreshTokenAuthorizer, TransferData)
from globus_sdk.exc import GlobusAPIError

authout = subprocess.run(['/usr/local/anaconda3/bin/parsl-globus-auth'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print (authout.stdout)
print (authout.stderr)
# Perform a Globus directory transfer, reusing refresh tokens we've already obtained for PARSL.
# Note this is NOT a PARSL transfer


def load_tokens_from_file(filepath):
    """Load a set of saved tokens."""
    with open(filepath, 'r') as f:
        tokens = json.load(f)

    return tokens

def save_tokens_to_file(filepath, tokens):
    """Save a set of tokens for later use."""
    with open(filepath, 'w') as f:
        json.dump(tokens, f)


def update_tokens_file_on_refresh(token_response):
    """
    Callback function passed into the RefreshTokenAuthorizer.
    Will be invoked any time a new access token is fetched.
    """
    save_tokens_to_file(globus_tokens, token_response.by_resource_server)

dcde_parsl_client_id = '8b8060fd-610e-4a74-885e-1051c71ad473'

globus_tokens='/home/dcde1000006/.parsl/.globus.json'

# First authorize using those refresh tokens:

try:
    tokens = load_tokens_from_file(globus_tokens)

except:
    print("Valid refresh tokens not found in {}.  Unable to authorize to Globus.  Exiting!".format(globus_tokens))
    sys.exit(-1)


transfer_tokens = tokens['transfer.api.globus.org']

try:
    auth_client = NativeAppAuthClient(client_id=dcde_parsl_client_id)
except:
    print ("ERROR: Globus NativeAppAuthClient() call failed!  Unable to obtain a Globus authorizer!")
    sys.exit(-1)

authorizer = RefreshTokenAuthorizer(
    transfer_tokens['refresh_token'],
    auth_client,
    access_token=transfer_tokens['access_token'],
    expires_at=transfer_tokens['expires_at_seconds'],
    on_refresh=update_tokens_file_on_refresh)

try:
    tc = TransferClient(authorizer=authorizer)
except:
    print ("ERROR: TransferClient() call failed!  Unable to call the Globus transfer interface with the provided auth info!")
    sys.exit(-1)





b'Parsl Globus command-line authoriser\nIf authorisation to Globus is necessary, the library will prompt you now.\nOtherwise it will do nothing\nAuthorization complete\n'
b''


Now we should have a transfer client with auth, We can set up one or many transfers.  Remember each TransferData object has a specific src/dest, and we need to build in a list of files/dirs with add_item().

Now Check to see we have parsl configs loaded for remote execution:

## The DCDE

(Talk about DCDE environment, single identity, use of multiple sites with that one identity.  Compute/data at 3 sites: ANL, BNL, ORNL.  Talk about use case: Observational science with computing requirements, specifically Cryo-electron microscopy.  We have a body of data, need to do various computing steps on it to go from raw data to 3d reconstructions of proteings)

## The application & demonstration

(Talk about Relion, which processes the Cryo-EM data. We will run the application in a singularity container at multiple compute sites, using Globus to sync the data across sites.  The workflow will be controlled by the parsl library of python functions, which we will use to run distributed HPC-style jobs)

**Demo step: Show data set at BNL**

In [None]:
!ls -lht /hpcgpfs01/scratch/dcde1000006/sc19-data

**Demo step: Run import at BNL**  *Note: The short bnl job that i have right now is relion_postprocess*

In [3]:
parsl.clear()

#parsl.set_stream_logger()
parsl.load(bnl_config)
# Note: clear(), load(), dfk() are in DataFlowKernelLoader (dflow.py)
bnl_dfk = parsl.dfk()

@bash_app
def relion_import(job_dir=None, stdout=None, stderr=None, mock=False):
    """
    Parameters
    ----------
    mock : (Bool)
       when mock=True
    """
    cmd_line = '''#!/bin/bash -l

export DATAROOT=/hpcgpfs01/scratch/dcde1000006/sc19-data
export RELION_SIMG=/sdcc/u/dcde1000006/relion_singv26.simg

export MOVIESTAR=${{DATAROOT}}/Import/job001/movies.star
export INSTAR=${{DATAROOT}}/CtfFind/job003/micrographs_ctf.star
export REFSTAR=${{DATAROOT}}/Select/job007/class_averages.star
export PICKDIR=${{DATAROOT}}/AutoPick/job010/

cd ${{DATAROOT}}
echo -n "working directory: "
pwd
set -v

singularity exec  -B /hpcgpfs01:/hpcgpfs01 ${{RELION_SIMG}} relion_star_loopheader rlnMicrographMovieName > ${{MOVIESTAR}}
singularity exec  -B /hpcgpfs01:/hpcgpfs01 ${{RELION_SIMG}} ls Micrographs/*.mrcs >> ${{MOVIESTAR}}

echo "Wrote file ${{MOVISTAR}}:"; echo
cat ${{MOVIESTAR}}

    '''
    if mock:
        return '''tmp_file=$(mktemp);
cat<<EOF > $tmp_file
{}
EOF
cat $tmp_file
        '''.format(cmd_line)
    else:
        return cmd_line


relion_stdout=os.path.join(bnl_config.executors[0].working_dir, 'relion-bnl-import.out')
relion_stderr=os.path.join( bnl_config.executors[0].working_dir, 'relion-bnl-import.err')

local_logdir='/hpcgpfs01/scratch/dcde1000006/sc19-data/parsl-outputs'
local_logfile=os.path.join(local_logdir, 'relion-bnl-import.out')

try:
    os.remove(relion_stdout)
except OSError:
    pass
except FileNotFoundError:
    pass
try:
    os.remove(relion_stderr)
except OSError:
    pass
except FileNotFoundError:
    pass
try:
    os.remove(local_logfile)
except OSError:
    pass
except FileNotFoundError:
    pass


print ('job setup: stdout = {}\nstderr = {}'.format(relion_stdout,relion_stderr))
# parsl.set_stream_logger()

x = relion_import(job_dir=bnl_config.executors[0].working_dir, stdout=relion_stdout, stderr=relion_stderr, mock = False)
print('relion_import() invoked, now waiting...')
x.result()
print('relion_import() invoked has finished, output should print now:')

# FIXME: This is still goofy,  trying to get the calls and logic right:
#if x.done():
#if x.result():
if True:
    bnl_dfk.executors['bnl-condor'].provider.channel.pull_file(relion_stdout, local_logdir)
    with open(local_logfile, 'r') as f:
        print(f.read())

# Try to shut down if we're done
if x.done():
    print('parsl done() call returned True.  Trying to shut down executor...')
    bnl_dfk.executors['bnl-condor'].shutdown()
else:
    print("Oops!  parsl done() call returned False!  For some reason we don't think we're done!")


job setup: stdout = /sdcc/u/dcde1000006/parsl_scripts/relion-bnl-import.out
stderr = /sdcc/u/dcde1000006/parsl_scripts/relion-bnl-import.err
relion_import() invoked, now waiting...
relion_import() invoked has finished, output should print now:
working directory: /hpcgpfs01/scratch/dcde1000006/sc19-data
Wrote file :

data_
loop_
_rlnMicrographMovieName
Micrographs/Falcon_2012_06_12-14_33_35_0_movie.mrcs
Micrographs/Falcon_2012_06_12-14_57_34_0_movie.mrcs
Micrographs/Falcon_2012_06_12-15_14_01_0_movie.mrcs
Micrographs/Falcon_2012_06_12-15_41_22_0_movie.mrcs
Micrographs/Falcon_2012_06_12-15_53_09_0_movie.mrcs
Micrographs/Falcon_2012_06_12-15_56_10_0_movie.mrcs
Micrographs/Falcon_2012_06_12-16_26_22_0_movie.mrcs
Micrographs/Falcon_2012_06_12-16_44_07_0_movie.mrcs
Micrographs/Falcon_2012_06_12-16_55_40_0_movie.mrcs
Micrographs/Falcon_2012_06_12-16_59_12_0_movie.mrcs
Micrographs/Falcon_2012_06_12-17_02_43_0_movie.mrcs
Micrographs/Falcon_2012_06_12-17_14_17_0_movie.mrcs
Micrographs/Falcon_201

In [None]:
!ls -lt /hpcgpfs01/scratch/dcde1000006/sc19-data/Import/job001

|  name |   Other  name   | UUID  | Directory |
|  --- | ---   | ---  | --- |
| ORNL DTN | `CADES-OR` |  `57230a10-7ba2-11e7-8c3b-22000b9923ef` | `/nfs/data/dcde-store/scratch/sc19-demo`  |
| EMSL DTN | `mscdtn.emsl.pnl.gov` |  `e133a52e-6d04-11e5-ba46-22000b92c6ec` | `/dtemp/mscfops/d3c724/sc19-demo` |
| BNL DTN | `globus02.sdcc.bnl.gov` |   `23f78cc8-41e0-11e9-a618-0a54e005f950` | `/hpcgpfs01/scratch/dcde1000006/sc19-data` |
| LCRC DTN | `bmgt3.lcrc.anl.gov` | `57b72e31-9f22-11e8-96e1-0a6d4e044368` |  `/blues/gpfs/home/dcowley/sc19-data` |


**Demo step: Sync data from BNL to ORNL via Globus**

In [4]:
srcep = BNL_EP
destep = ORNL_EP
srcdir = '/hpcgpfs01/scratch/dcde1000006/sc19-data'
destdir =  '/nfs/data/dcde-store/scratch/sc19-demo'

xferlabel = "DCDE Relion transfer BNL to ORNL"

tdata = TransferData(tc, srcep, destep,
                     label=xferlabel,
                     sync_level="mtime")


tdata.add_item( srcdir, destdir, recursive = True)
    
transfer_result = tc.submit_transfer(tdata)

print("task_id =", transfer_result["task_id"])


while not tc.task_wait(transfer_result['task_id'], timeout=1200, polling_interval=10):
    print(".", end="")
print("\n{} completed!".format(transfer_result['task_id']))

# This won't work since directory is remote
#os.listdir(path=destdir)

task_id = 9ba2fac0-0989-11ea-be9a-02fcc9cdd752

9ba2fac0-0989-11ea-be9a-02fcc9cdd752 completed!




**Demo step: Sync data from BNL to ANL via Globus**

In [5]:
srcep = BNL_EP
destep = ANL_EP
srcdir = '/hpcgpfs01/scratch/dcde1000006/sc19-data'
destdir =  '/blues/gpfs/home/dcowley/sc19-demo'

xferlabel = "DCDE Relion transfer BNL to ANL"



tdata = TransferData(tc, srcep, destep,
                     label=xferlabel,
                     sync_level="mtime")


tdata.add_item( srcdir, destdir, recursive = True)
    
transfer_result = tc.submit_transfer(tdata)

print("task_id =", transfer_result["task_id"])


while not tc.task_wait(transfer_result['task_id'], timeout=1200, polling_interval=10):
    print(".", end="")
print("\n{} completed!".format(transfer_result['task_id']))


task_id = 1b668f00-098b-11ea-be9a-02fcc9cdd752

1b668f00-098b-11ea-be9a-02fcc9cdd752 completed!


**Test step: run autopick at ANL**

In [6]:
parsl.clear()

parsl.load(anl_config)
anl_dfk = parsl.dfk()
#print(anl_dfk.executors)

@bash_app
def relion_autopick(job_dir=None, stdout=None, stderr=None, mock=True):
    """
    Parameters
    ----------
    mock : (Bool)
       when mock=True
    """
    cmd_line = '''#!/bin/bash -l

export I_MPI_FABRICS=shm:tmi

export DATAROOT=/blues/gpfs/home/dcowley/relion-bootstrap
export RELION_SIMG=/lcrc/project/DCDE/relion/relion_singv26.simg

export INSTAR=${{DATAROOT}}/CtfFind/job003/micrographs_ctf.star
export REFSTAR=${{DATAROOT}}/Select/job007/class_averages.star
export PICKDIR=${{DATAROOT}}/AutoPick/job010/

echo -n "working directory: "
pwd
module load singularity/2.6.0
set -v

singularity exec  -B /blues/gpfs/home:/blues/gpfs/home ${{RELION_SIMG}} relion_autopick --i ${{INSTAR}} --ref ${{REFSTAR}} --odir ${{PICKDIR}} --pickname autopick --invert  --ctf  --ang 5 --shrink 0 --lowpass 20 --particle_diameter 200 --threshold 0.4 --min_distance 110 --max_stddev_noise 1.1
    '''
    if mock:
        return '''tmp_file=$(mktemp);
cat<<EOF > $tmp_file
{}
EOF
cat $tmp_file
        '''.format(cmd_line)
    else:
        return cmd_line

relion_stdout=os.path.join(anl_config.executors[0].working_dir, 'relion-anl-autopick.out')
relion_stderr=os.path.join( anl_config.executors[0].working_dir, 'relion-anl-autopick.err')

#local_logdir='/blues/gpfs/home/dcowley/sc19-data/parsl-outputs'
# This is local to BNL!
local_logdir= '/hpcgpfs01/scratch/dcde1000006/sc19-data/parsl-outputs'

local_logfile=os.path.join(local_logdir, 'relion-anl-autopick.out')

try:
    os.remove(relion_stdout)
except OSError:
    pass
except FileNotFoundError:
    pass
try:
    os.remove(relion_stderr)
except OSError:
    pass
except FileNotFoundError:
    pass
try:
    os.remove(local_logfile)
except OSError:
    pass
except FileNotFoundError:
    pass

print ('job setup: stdout = {}\nstderr = {}'.format(relion_stdout,relion_stderr))
# parsl.set_stream_logger()
# Call Relion and wait for results

x = relion_autopick(stdout=relion_stdout, stderr=relion_stderr, mock = False)
print('relion_autopick() invoked, now waiting...')
x.result()
print('relion_autopick() returned, output should print below:')

#if x.done():
if True:
    anl_dfk.executors['anl-slurm'].provider.channel.pull_file(relion_stdout, local_logdir)
    with open(local_logfile, 'r') as f:
        print(f.read())


job setup: stdout = /home/dcowley/parsl_scripts/relion-anl-autopick.out
stderr = /home/dcowley/parsl_scripts/relion-anl-autopick.err
relion_autopick() invoked, now waiting...
relion_autopick() returned, output should print below:
working directory: /blues/gpfs/home/dcowley/sc19-demo
 + Using (micrograph) pixel size from input STAR file of 3.54 Angstroms
 + Run autopicking on the following micrographs: 
    * MotionCorr/job002/Micrographs/Falcon_2012_06_12-14_33_35_0.mrc
    * MotionCorr/job002/Micrographs/Falcon_2012_06_12-14_57_34_0.mrc
    * MotionCorr/job002/Micrographs/Falcon_2012_06_12-15_14_01_0.mrc
    * MotionCorr/job002/Micrographs/Falcon_2012_06_12-15_41_22_0.mrc
    * MotionCorr/job002/Micrographs/Falcon_2012_06_12-15_53_09_0.mrc
    * MotionCorr/job002/Micrographs/Falcon_2012_06_12-15_56_10_0.mrc
    * MotionCorr/job002/Micrographs/Falcon_2012_06_12-16_26_22_0.mrc
    * MotionCorr/job002/Micrographs/Falcon_2012_06_12-16_44_07_0.mrc
    * MotionCorr/job002/Micrographs/Falcon

In [None]:
parsl.set_stream_logger()
parsl.wait_for_current_tasks()
parsl.clear

In [None]:
if x.done():
    parsl.clear()

**Demo step: Run extract & autopick steps at ORNL**

In [None]:
parsl.clear()

#parsl.set_stream_logger()
parsl.load(ornl_config)
ornl_dfk = parsl.dfk()

@bash_app
def relion_autopick_extract_ornl(job_dir=None, stdout=None, stderr=None, mock=True):
    """
    Parameters
    ----------
    mock : (Bool)
       when mock=True
    """
    cmd_line = '''#!/bin/bash -l

export DATAROOT=/nfs/data/dcde-store/scratch/sc19-demo
export RELION_SIMG=/nfs/sw/relion/relion_singv26.simg

export INSTAR=${{DATAROOT}}/CtfFind/job003/micrographs_ctf.star
export REFSTAR=${{DATAROOT}}/Select/job007/class_averages.star
export PICKDIR=${{DATAROOT}}/AutoPick/job010/
export PARTSTAR=${{DATAROOT}}/Extract/job011/particles.star
export PARTDIR=${{DATAROOT}}/job011/

echo -n "working directory: "
pwd
set -v

# output directory /nfs/data/dcde-store/scratch/sc19-demo/parsl-outputs

singularity exec -B ${{DATAROOT}}:${{DATAROOT}} ${{RELION_SIMG}} relion_autopick --i ${{INSTAR}} --ref ${{REFSTAR}} --odir ${{PICKDIR}} --pickname autopick --invert  --ctf  --ang 5 --shrink 0 --lowpass 20 --particle_diameter 200 --threshold 0.4 --min_distance 110 --max_stddev_noise 1.1 # --gpu "0"

echo ${{INSTAR}} > AutoPick/job010/coords_suffix_autopick.star

singularity exec -B ${{DATAROOT}}:${{DATAROOT}}  ${{RELION_SIMG}} relion_preprocess --i ${{INSTAR}} --coord_dir ${{PICKDIR}} --coord_suffix _autopick.star --part_star ${{PARTSTAR}} --part_dir ${{PARTDIR}} --extract --extract_size 100 --norm --bg_radius 30 --white_dust -1 --black_dust -1 --invert_contrast

    '''
    if mock:
        return '''tmp_file=$(mktemp);
cat<<EOF > $tmp_file
{}
EOF
cat $tmp_file
        '''.format(cmd_line)
    else:
        return cmd_line



relion_stdout=os.path.join(ornl_config.executors[0].working_dir, 'relion-ornl-extract.out')
relion_stderr=os.path.join(ornl_config.executors[0].working_dir, 'relion-ornl-extract.err')

local_logdir='/nfs/data/dcde-store/scratch/sc19-demo/parsl-outputs'
local_logfile=os.path.join(local_logdir, 'relion-ornl-extract.out')

try:
    os.remove(relion_stdout)
except OSError:
    pass
except FileNotFoundError:
    pass
try:
    os.remove(relion_stderr)
except OSError:
    pass
except FileNotFoundError:
    pass
try:
    os.remove(local_logfile)
except OSError:
    pass
except FileNotFoundError:
    pass


print ('job setup: \nstdout = {}\nstderr = {}'.format(relion_stdout,relion_stderr))
# parsl.set_stream_logger()
# Call Relion and wait for results

x = relion_autopick_extract_ornl(stdout=relion_stdout, stderr=relion_stderr, mock = True)
print('relion_autopick_extract_ornl() invoked, now waiting...')
x.result()
print('relion_autopick_extract_ornl() returned, should print output below:')

if x.done():
    with open(x.stdout, 'r') as f:
        print(f.read())


job setup: 
stdout = /nfs/scratch/dcde1000006/parsl_scripts/relion-ornl-extract.out
stderr = /nfs/scratch/dcde1000006/parsl_scripts/relion-ornl-extract.err
relion_autopick_extract_ornl() invoked, now waiting...


In [None]:
exit()

**Demo step: Sync new data back from ORNL to BNL**

In [None]:
srcep = ORNL_EP
destep = BNL_EP
srcdir =  '/nfs/data/dcde-store/scratch/sc19-demo'
destdir = '/hpcgpfs01/scratch/dcde1000006/sc19-data'

xferlabel = "DCDE Relion transfer ORNL to BNL"

tdata = TransferData(tc, srcep, destep,
                     label=xferlabel,
                     sync_level="mtime")


tdata.add_item( srcdir, destdir, recursive = True)
    
transfer_result = tc.submit_transfer(tdata)

print("task_id =", transfer_result["task_id"])


while not tc.task_wait(transfer_result['task_id'], timeout=1200, polling_interval=10):
    print(".", end="")
print("\n{} completed!".format(transfer_result['task_id']))

In [None]:
exit()

**Demo step: Sync new data back from ORNL to BNL, sync BNL to ANL**

**Demo step: Sync new data back from ORNL to BNL, sync BNL to ANL**

**Demo step: (Skipping ahead) run 3d refinement (parallel?) at ANL**

**Demo step: Sync new data back from ANL to BNL**

**Demo step: Show 3d reconstruction in nglview**

# End of Demo

## Move me: Sync fresh data set from ANL to BNL

In [None]:

tdata = TransferData(tc, ANL_EP, BNL_EP,
                     label="DCDE Relion transfer",
                     sync_level="size")

tdata.add_item('/blues/gpfs/home/dcowley/dcde-sc19-relion-data-clean.tgz',
            '/hpcgpfs01/scratch/dcde1000006/dcde-sc19-relion-data-clean.tgz')

transfer_result = tc.submit_transfer(tdata)

print("task_id =", transfer_result["task_id"])


while not tc.task_wait(transfer_result['task_id'], timeout=1200, polling_interval=10):
    print(".", end="")
print("\n{} completed!".format(transfer_result['task_id']))

os.listdir(path='/hpcgpfs01/scratch/dcde1000006/')

In [None]:
!ls -l /hpcgpfs01/scratch/dcde1000006/