# Dials V3 funcX Flow


In [None]:
import os
import sys
import time
import json
import shortuuid

from funcx.sdk.client import FuncXClient
from fair_research_login import NativeClient
from globus_automate_client import (create_flows_client, graphviz_format, 
                                    state_colors_for_log, create_action_client, 
                                    create_flows_client)
from globus_automate_client.token_management import CLIENT_ID

# Authenticate

Auth with the funcX and Automate clients.

Note: You will still need to grant access to the flow later on in this notebook.

In [None]:
fxc = FuncXClient()

In [None]:
flows_client = create_flows_client(CLIENT_ID)

Get an auth token to HTTPS download from petrel#globuslabs

In [None]:
client = NativeClient(client_id='7414f0b4-7d05-4bb6-bb00-076fa3f17cf5')
tokens = client.login(requested_scopes=['https://auth.globus.org/scopes/56ceac29-e98a-440a-a594-b41e7a084b62/all'])
auth_token = tokens["petrel_https_server"]['access_token']
headers = {'Authorization': f'Bearer {auth_token}'}
print(headers)

# Test Setup
## Edit things here only!!

In [None]:
##location of the containers inside the resource
test_container_path = "/home/rvescovi/.funcx/containers/test.simg"
dials_container_path = "/home/rvescovi/.funcx/containers/dials_v3.simg"

#funcx endpoint configuration for theta
theta_conf = {'endpoint': '8f2f2eab-90d2-45ba-a771-b96e6d530cad',
              'local_endpoint': '8f2f2eab-90d2-45ba-a771-b96e6d530cad',
              'data_dir': '/projects/APSDataAnalysis/Braid/data/SSX',
              'proc_dir': f'/projects/APSDataAnalysis/Braid/process/'}

#funcx endpoint configuration for midway 
midway_conf = {'endpoint': 'd86e31f7-71b2-4e42-8a54-7bc9d5e79df9',
              'local_endpoint': 'de6a8104-e53e-4dbd-82f1-2e9a09462a31',
              'data_dir': '/projects/APSDataAnalysis/Braid/data/SSX',
              'proc_dir': f'/project2/chard/ryan/Braid/process/'}

##Choose which resource to run.
conf = theta_conf

# funcX setup
Register the containers and functions for the flow.

In [None]:
# An initial container to use
init_cont_id = fxc.register_container(location=test_container_path, container_type='singularity')
# And the container we will download during the flow
dials_cont_id = fxc.register_container(location=dials_container_path, container_type='singularity')

In [None]:
def download_cont(data):
    """Download the container and dataset"""
    import os
    import requests


    server_url = data.get('server_url', "")
    container_name = data.get('container_name', "")
    container_url = os.path.join(server_url,container_name)

    if not server_url | container_name:
        return "No container found on server"

    headers = data['headers']


    if not os.path.isfile(data['container_path']):
        r = requests.get(container_url, headers=headers)
        open(data['container_path'] , 'wb').write(r.content)
    
    return 'done'

download_cont_fxid = fxc.register_function(download_cont, container_uuid=init_cont_id)

In [None]:
def download_data(data):
    import os
    import requests

    data_dir = data['data_dir']
    headers = data['headers']
    dataset_url = data['dataset_url']

    if not os.path.exists(data_dir):
        os.makedirs(data_dir)
    
    dataset_name = dataset_url.split("/")[-1]
    
    dataset_path = os.path.join(data_dir, dataset_name)
    
    if not os.path.isfile(dataset_path):
        r = requests.get(dataset_url, headers=headers)
        open(dataset_path , 'wb').write(r.content)
    
    return 'done'

download_data_fxid = fxc.register_function(download_data, container_uuid=init_cont_id)

In [None]:
def unwrap_data(data):
    import os
    import tarfile

    data_dir = data['data_dir']
    dataset_url = data['dataset_url']
    
    dataset_name = dataset_url.split("/")[-1]
    dataset_path = os.path.join(data_dir, dataset_name)
    
    folder_name = dataset_name.split(".")[0] ##change for os.path.filename??
    folder_path = os.path.join(data_dir,folder_name)
    os.mkdir(folder_path)
    
    if os.path.isfile(dataset_path):
        with tarfile.open(dataset_path) as file:
            file.extractall(folder_path)
        return folder_path
    return 'file does not exist!'
  
unwrap_data_fxid = fxc.register_function(unwrap_data, container_uuid=init_cont_id)

In [None]:
def funcx_create_phil(data):
    """Create a phil file"""
    import json
    import os
    from string import Template

    proc_dir = data['proc_dir']
    data_dir = os.path.split(data['input_files'])[0]
    run_num = data['input_files'].split("_")[-2]
    
    
    if 'suffix' in data:
        phil_name = f"{proc_dir}/process_{run_num}_{data['suffix']}.phil"
    else:
        phil_name = f"{proc_dir}/process_{run_num}.phil"

    
    unit_cell = data.get('unit_cell', None)
    
    ##opening existing files
    beamline_json = os.path.join(data_dir,f"beamline_run{run_num}.json")
    mask = os.path.join(data_dir,data.get('mask', 'mask.pickle'))

    beamline_data = None

    try:
        with open(beamline_json, 'r') as fp:
            beamline_data = json.loads(fp.read())

        if not unit_cell:
            unit_cell = beamline_data['user_input']['unit_cell']

        unit_cell = unit_cell.replace(",", " ")
        space_group = beamline_data['user_input']['space_group']
        det_distance = float(beamline_data['beamline_input']['det_distance']) * -1.0
    except:
        pass

    template_data = {'det_distance': det_distance,
                     'unit_cell': unit_cell,
                     'nproc': data['nproc'],
                     'space_group': space_group,
                     'beamx': data['beamx'],
                     'beamy': data['beamy'],
                     'mask': mask}

    template_phil = Template("""spotfinder.lookup.mask=$mask
integration.lookup.mask=$mask
spotfinder.filter.min_spot_size=2
significance_filter.enable=True
#significance_filter.isigi_cutoff=1.0
mp.nproc = $nproc
mp.method=multiprocessing
refinement.parameterisation.detector.fix=none
geometry {
  detector {
      panel {
                fast_axis = 0.9999673162585729, -0.0034449798523932267, -0.007314268824966957
                slow_axis = -0.0034447744696749034, -0.99999406591948, 4.0677756813531234e-05
                origin    = $beamx, $beamy, $det_distance
                }
            }
         }
indexing {
  known_symmetry {
    space_group = $space_group
    unit_cell = $unit_cell
  }
  stills.indexer=stills
  stills.method_list=fft1d
  multiple_lattice_search.max_lattices=3
}""")
    phil_data = template_phil.substitute(template_data)

    if not os.path.exists(proc_dir):
        os.mkdir(proc_dir)
        
    with open(phil_name, 'w') as fp:
        fp.write(phil_data)
    return phil_name

create_phil_fxid = fxc.register_function(funcx_create_phil, container_uuid=init_cont_id)

In [None]:
def funcx_stills_process(data):
    import os
    import subprocess
    from distutils.dir_util import copy_tree
    from subprocess import PIPE

    
    proc_dir = data['proc_dir']
    input_files = data['input_files']

    run_num = data['input_files'].split("_")[-2]
    
    
    if 'suffix' in data:
        phil_name = f"{proc_dir}/process_{run_num}_{data['suffix']}.phil"
    else:
        phil_name = f"{proc_dir}/process_{run_num}.phil"

    file_end = data['input_range'].split("..")[-1]
  
    if not "timeout" in data:
        data["timeout"] = 0
        
    cmd = f'source /usr/local/dials-dev20210305/dials_env.sh && dials.stills_process {phil_name} {input_files} > log-{file_end}.txt'

    
    os.chdir(proc_dir) ##Need to guarantee the worker is at the correct location..
    res = subprocess.run(cmd, stdout=PIPE, stderr=PIPE,
                             shell=True, executable='/bin/bash')
    
    return str(res.stdout)


stills_fxid = fxc.register_function(funcx_stills_process, container_uuid=dials_cont_id)

In [None]:
def funcx_plot_ssx(data):
    import os
    import json
    import shutil
    import glob
    import subprocess
    import numpy as np
    from subprocess import PIPE
    import matplotlib.pyplot as plt
    import matplotlib.cm as cm


    data_dir = data['data_dir']
    proc_dir = data['proc_dir']
    data_dir = os.path.split(data['input_files'])[0]
    run_num = data['input_files'].split("_")[-2]
    
    
    if 'suffix' in data:
        phil_name = f"{proc_dir}/process_{run_num}_{data['suffix']}.phil"
    else:
        phil_name = f"{proc_dir}/process_{run_num}.phil"


    ##opening existing files
    beamline_json = os.path.join(data_dir,f"beamline_run{run_num}.json")

    beamline_data = None
    with open(beamline_json, 'r') as fp:
        beamline_data = json.loads(fp.read())

    xdim = int(beamline_data['user_input']['x_num_steps'])
    ydim = int(beamline_data['user_input']['y_num_steps'])

    # Get the list of int files in this range
    int_files = glob.glob(os.path.join(proc_dir,'int-*.pickle'))

    ##########
    #lattice_counts = get_lattice_counts(xdim, ydim, int_files)
    ##########
    lattice_counts = np.zeros(xdim*ydim)
    for int_file in int_files:
        int_file = int_file.rstrip('.pickle\n')
        index = int(int_file.split('_')[-1])
        lattice_counts[index] += 1

    lattice_counts = lattice_counts.reshape((ydim, xdim))
    # reverse the order of alternating rows
    lattice_counts[1::2, :] = lattice_counts[1::2, ::-1]
    
  
    plot_name = f'1int-sinc-{data["input_range"]}.png'

    ########
    #plot_lattice_counts(xdim, ydim, lattice_counts, plot_name)
    ########

    fig = plt.figure(figsize=(xdim/10., ydim/10.))
    plt.axes([0, 0, 1, 1])  # Make the plot occupy the whole canvas
    plt.axis('off')
    plt.imshow(lattice_counts, cmap='hot', interpolation=None, vmax=4)
    plt.savefig(plot_name)


    exp_name = data['input_files'].split("/")[-1].split("_")[0]

    # create an images directory
    image_dir = f"{proc_dir}/{exp_name}_images"
    if not os.path.exists(image_dir):
        os.makedirs(image_dir)

    int_file = f"{image_dir}/{exp_name}_ints.txt"
    with open(int_file, 'w+') as fp:
        fp.write("\n".join(i for i in int_files))

    os.chdir(image_dir)

    cmd = f"source source /usr/local/dials-dev20210305/dials_env.sh && \
        dials.unit_cell_histogram ../{proc_dir}_processing/*integrated_experiments.json"

    subprocess.run(cmd, stdout=PIPE, stderr=PIPE, shell=True, executable='/bin/bash')

    return plot_name

plot_ssx_fxid = fxc.register_function(funcx_plot_ssx, container_uuid=dials_cont_id)

# Create the flow

In [None]:
flow_definition = {
  "Comment": "Dials V3 Flow",
  "StartAt": "DownloadContainer",
  "States": {
    "DownloadContainer": {
      "Comment": "Download the container",
      "Type": "Action",
      "ActionUrl": "https://api.funcx.org/automate",
      "ActionScope": "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all",
      "Parameters": {
          "tasks": [{
            "endpoint.$": "$.input.funcx_local_ep",
            "func.$": "$.input.download_cont_fxid",
            "payload.$": "$.input"
        }]
      },
      "ResultPath": "$.Exec1Result",
      "WaitTime": 600,
      "Next": "DownloadData"
    },
    "DownloadData": {
      "Comment": "Download the data",
      "Type": "Action",
      "ActionUrl": "https://api.funcx.org/automate",
      "ActionScope": "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all",
      "Parameters": {
          "tasks": [{
            "endpoint.$": "$.input.funcx_local_ep",
            "func.$": "$.input.download_data_fxid",
            "payload.$": "$.input"
        }]
      },
      "ResultPath": "$.Exec2Result",
      "WaitTime": 600,
      "Next": "UnwrapData"
    },
    "UnwrapData": {
      "Comment": "Unwrap Data",
      "Type": "Action",
      "ActionUrl": "https://api.funcx.org/automate",
      "ActionScope": "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all",
      "Parameters": {
          "tasks": [{
            "endpoint.$": "$.input.funcx_local_ep",
            "func.$": "$.input.unwrap_data_fxid",
            "payload.$": "$.input"
        }]
      },
      "ResultPath": "$.Exec3Result",
      "WaitTime": 600,
      "Next": "DialsCreatePhil"
    },
    "DialsCreatePhil": {
      "Comment": "Create Dials Phil",
      "Type": "Action",
      "ActionUrl": "https://api.funcx.org/automate",
      "ActionScope": "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all",
      "Parameters": {
          "tasks": [{
            "endpoint.$": "$.input.funcx_ep",
            "func.$": "$.input.create_phil_fxid",
            "payload.$": "$.input"
        }]
      },
      "ResultPath": "$.Exec4Result",
      "WaitTime": 600,
      "Next": "DialsStills"
    },
    "DialsStills": {
      "Comment": "Dials Stills Function",
      "Type": "Action",
      "ActionUrl": "https://api.funcx.org/automate",
      "ActionScope": "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all",
      "Parameters": {
          "tasks": [{
            "endpoint.$": "$.input.funcx_ep",
            "func.$": "$.input.stills_fxid",
            "payload.$": "$.input"
        }]
      },
      "ResultPath": "$.Exec5Result",
      "WaitTime": 600,
      "Next": "PlotSSX"
    },
    "PlotSSX": {
      "Comment": "Dials Plot Function",
      "Type": "Action",
      "ActionUrl": "https://api.funcx.org/automate",
      "ActionScope": "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all",
      "Parameters": {
          "tasks": [{
            "endpoint.$": "$.input.funcx_ep",
            "func.$": "$.input.plot_fxid",
            "payload.$": "$.input"
        }]
      },
      "ResultPath": "$.Exec6Result",
      "WaitTime": 600,
      "End": True
    }
  }
}

In [None]:
flow = flows_client.deploy_flow(flow_definition, title="Dials V3 Example Flow")
flow_id = flow['id']
flow_scope = flow['globus_auth_scope']
print(f'Newly created flow with id:\n{flow_id}')

# Define input for the flow

The input to the flow needs to specify what data to process, where it is located, and where to put it for analysis. The flow also requires the funcX function endpoint ids to use.

In [None]:
#Set the name for the processing folder intermediate results
experiment_name = 'braid_dials_v3'
run_name = experiment_name + '_' + shortuuid.uuid()

print(run_name)

In [None]:
# Specify the range of files to process
proc_range = "{00001..00050}"

flow_input = {
    "input": {
        #HTTPS-Download Container variables
        "server_url":"https://45a53408-c797-11e6-9c33-22000a1e3b52.e.globus.org/Braid/containers",
        "container_name": "dials_v3_example.simg",
        "container_path": dials_container_path, ##defined in the funcx container configuration
        "headers": headers,
        
        #SingleFile-Download Variables
        "dataset_url": "https://45a53408-c797-11e6-9c33-22000a1e3b52.e.globus.org/Braid/data/ssx_example/levin_29.tar.xz",
        "data_dir": conf['data_dir'],

        #Processing variables
        "proc_dir": os.path.join(conf['proc_dir'],run_name),

        #Dials specific variables.
        "input_files": f"{conf['data_dir']}/levin_29/Levin_29_{proc_range}.cbf", 
        "input_range": proc_range[1:-1],
        "nproc": 10,
        "beamx": "-214.400",
        "beamy": "218.200",

        #Dials funcX functions
        "create_phil_fxid": create_phil_fxid,
        "stills_fxid": stills_fxid,
        "plot_fxid": plot_ssx_fxid,

        #Utility funcX functions
        "download_cont_fxid": download_cont_fxid,
        "download_data_fxid": download_data_fxid,
        "unwrap_data_fxid": unwrap_data_fxid,

        
        # funcX endpoints
        "funcx_ep": conf['endpoint'],
        "funcx_local_ep": conf['local_endpoint'],
    }
}

# Run the flow

This will require you to authenticate and grant access to the flow to use Transfer and funcX on your behalf.

The flow should take a couple of minutes.

In [None]:
flow_action = flows_client.run_flow(flow_id, flow_scope, flow_input)
print(flow_action)
flow_action_id = flow_action['action_id']
flow_status = flow_action['status']
print(f'Flow action started with id: {flow_action_id}')
while flow_status == 'ACTIVE':
    time.sleep(10)
    flow_action = flows_client.flow_action_status(flow_id, flow_scope, flow_action_id)
    flow_status = flow_action['status']
    print(f'Flow status: {flow_status}')