# Parsl for workflow definitions

This repository contains a Parsl implementation of the Phyloflow workflow. The code developed in Parsl is located inside the */parsl* directory. To follow the translation process see [documentation.ipynb](./documentation.ipynb). 

# Workflow Context

## Phyloflow

Phyloflow is a phylogenetic tree calculation tool packaged with Docker and WDL. For more information on Phyloflow, see [github.com/ncsa/phyloflow](https://github.com/ncsa/phyloflow)

<img src="./Images/phyloflow-workflow.png" height="360">

---
## <img src="./Images/wdl-logo.png" height="50">

WDL (Workflow Description Language) is a way to specify data processing workflows. It is used extensively in scientific research focused mainly on Medical and Bioinformatics research. 

A common WDL script is comprised of a series of tasks that are called within a wokflow definition. The data flows within tasks through file dependencies. As an example here is the task for the pyclone-vi task:

<font size="1">

```
task pyclone_vi_clustering{
    input {
		File mutations_tsv
    }

	command {
		sh /code/pyclone_vi_entrypoint.sh ${mutations_tsv}
	}

	output {
		File response = stdout()
		File err_response = stderr()
		File cluster_assignment = 'cluster_assignment.tsv'
		}

	runtime {
		docker: 'public.ecr.aws/k1t6h9x8/phyloflow/pyclone_vi:latest'
		}
}

```

A task specifies the following sections:
* Input: These are files commonly generated by other tasks
* Command: A series of bash statements executed when input dependencies are met
* Output: A definition of all the output files that could be used as dependencies by other tasks
* Runtime: Typically a docker image with all the dependencies needed for the task to run


The WDL code for the whole workflow is detailed in [/workflows/phyloflow_standalone.wdl](./workflows/phyloflow_standalone.wdl). This file specifies five tasks: vcf_transform, pyclone_vi, cluster_transform, spruce_phylogeny and aggregate_json, which are subsequently called inside the phyloflow workflow.

---
## <img src="./Images/parsl-logo.png" height="50">

*Parsl extends parallelism in Python beyond a single computer*

In the context of workflow definitions for scientific research, Parsl appears in an attempt to unify the flexibility of a complete programming language as Python, along with the ease of use of WDL for parallelizable workflow definitions.

Parsl extends Python's syntax by implementing function decorators for @bash_app and @python_app. Both of these apps return AppFuture datatypes with the promise of execution once their dependencies are met. Apps receive and input array and generate an output array of DataFutures, which are commonly files. AppFutures are the building blocks of Parsl, just as tasks are from WDL. In Parsl, you compose a workflow by defining the output of an App as the input to another App. 

---

# Code Implementation

## Imports

In [1]:
import sys
sys.path.insert(0,'./parsl/')

In [2]:
import json

from appfuture_manager import AppFutureManager
from filesystem_util import (LOGS_DIR, ROOT, RUNS_DIR, format_files,
                             get_stdfiles)
from testing import *
from workflow_tasks import *

import parsl
from parsl import bash_app, python_app
from parsl.config import Config
from parsl.dataflow.futures import AppFuture
from parsl.executors import ThreadPoolExecutor

## Translating WDL Tasks

Every task of the WDL workflow was translated into 3 python functions: parsl app, get_inputs and run.

### App functions

The app function is in charge of running the same code that was originally inside the command section of the WDL task description.

In [3]:
@bash_app
def pyclone_vi(inputs=[], outputs=[], 
               stdout=None, stderr=None):
    return f'''
        conda run -n pyclone-vi pyclone-vi fit --in-file {inputs[0]} --out-file {outputs[0]}
        conda run -n pyclone-vi pyclone-vi write-results-file --in-file {outputs[0]} --out-file {outputs[1]}
        '''

### Get inputs functions

The get_inputs function receives AppFutures previously called on the workflow that the app depends on, extracts the outputs needed (DataFutures) and groups them inside an array.

In [4]:
def get_inputs_pyclone_vi(vcf_future:AppFuture):
    inputs = [
        vcf_future.outputs[2]
    ]
    return inputs

### Run functions

The run functions receive the inputs and the rundir where all outputs are going to be saved. These functions are in charge of defining the outputs and actually calling the app function.


In [5]:
def run_pyclone_vi(inputs:list, rundir:str) -> AppFuture:
    outputs = [
        'cluster_fit.hdf5',
        'cluster_assignment.tsv'
    ]
    outputs = format_files(rundir, outputs)
    stdout, stderr = get_stdfiles(rundir)
    pyclone_future = pyclone_vi(inputs=inputs, outputs=outputs,
                                stdout=stdout, stderr=stderr)
    return pyclone_future

The python code for all the tasks is inside [/parsl/workflow_tasks.py](./parsl/workflow_tasks.py). 

The reasoning behind run functions relying solely on input files, and not AppFutures generated by other apps, is that this modular design allows testing the performance of a single app with test files, without the need to run all the previous steps in the workflow. Also, it allows to compose new workflows starting from any task. This was useful when integrating with the OpenAI function calling API.

## OpenAI Integration

A series of adapter functions where created to execute the parsl apps from string inputs and appfuture ids. The adapter functions for all the tasks are defined inside [/parsl/function_calls.py](./parsl/function_calls.py). 

In [6]:
def fcall_execute(run_function, inputs):
    future_id = AppFutureManager.new_future_id(run_function)
    future_dir = generate_subdir(AppFutureManager.DIR, future_id)
    future = run_function(inputs, future_dir)
    print(future)
    AppFutureManager.index(future_id, future)
    return future_id

def fcall_from_files(run_function, inputs:list[str]):
    inputs = format_files(ROOT, inputs)
    return fcall_execute(run_function, inputs)

def fcall_pyclone_vi_from_files(pyclone_vi_formatted:str):
    inputs = [pyclone_vi_formatted]
    return fcall_from_files(run_pyclone_vi, inputs)

def fcall_pyclone_vi_from_futures(vcf_future_id:str):
    vcf_future = AppFutureManager.query(vcf_future_id)
    inputs = get_inputs_pyclone_vi(vcf_future)
    return fcall_execute(run_pyclone_vi, inputs)

## Testing

The unit tests of all the run functions are inside [/parsl/testing.py](./parsl/testing.py), where the test files are taken from the */example_data* directory. 

An example of a test function looks like this:

In [7]:
def test_pyclone_vi():
    fcall_pyclone_vi_from_files(
        pyclone_vi_formatted=test_files['pyclone_vi_formatted']
    )

## Filesystem Managing

WDL automatically generates a folder structure for the workflow run, as well as a directory for every single task in the workflow. Meanwhile, in Parsl you have to explicitly create a folder structure in order to organize the outputs of your workflow. This translates into greater flexibility for the developer, at the price of needing a better degree of knowledge about the file system. The utility functions for creating the folder structure are defined inside [/parsl/filesystem_util.py](./parsl/filesystem_util.py).

The results of each run go into a subdirectory of the *runs* directory. The AppFutureManager class defined inside [/parsl/appfuture_manager.py](./parsl/appfuture_manager.py) is in charge of creating the subdirectories, generating unique identifiers for each executed future app, and map the future app identifier to its corresponding object reference.

In [8]:
def last_run_tree():
    dir = os.path.join(RUNS_DIR, os.listdir(RUNS_DIR)[-1])
    tree_output = ! tree {dir}
    tree_output = '\n'.join(list(tree_output))
    special_chars = ['[01;34m', '[00m', '[0m', '[01;31m']
    for chars in special_chars:
        tree_output = tree_output.replace(chars, '')
    print(tree_output)

In [9]:
last_run_tree()

/home/alejo/Documents/NCSA/phyloflow_parsl/parsl/runs/example
├── future_1_run_vcf_transform
│   ├── headers.json
│   ├── mutations.json
│   ├── pyclone_samples
│   │   └── A25.tsv
│   ├── pyclone_vi_formatted.tsv
│   ├── stderr.txt
│   └── stdout.txt
├── future_2_run_vcf_transform
│   ├── headers.json
│   ├── pyclone_samples
│   ├── stderr.txt
│   └── stdout.txt
├── future_3_run_cluster_transform
│   ├── spruce_formatted.tsv
│   ├── stderr.txt
│   └── stdout.txt
├── future_4_run_spruce_tree
│   ├── spruce.cliques
│   ├── spruce.merged.res
│   ├── spruce.res
│   ├── spruce.res.gz
│   ├── spruce.res.json
│   ├── spruce.res.txt
│   ├── stderr.txt
│   └── stdout.txt
└── future_5_run_aggregate_json
    ├── aggregated.json
    ├── stderr.txt
    └── stdout.txt

8 directories, 23 files


## Full Workflow

The workflow function is only responsible for sending the outputs of one execution function as inputs to the other.

In [10]:
def fcall_full_workflow(vep_vcf:str):    
    vcf_future_id = fcall_vcf_transform_from_files(
        vep_vcf=vep_vcf
    )
    pyclone_future_id = fcall_pyclone_vi_from_futures(
        vcf_future_id=vcf_future_id
    )
    cluster_future_id = fcall_cluster_transform_from_futures(
        vcf_future_id=vcf_future_id,
        pyclone_future_id=pyclone_future_id
    )
    spruce_future_id = fcall_spruce_tree_from_futures(
        cluster_future_id=cluster_future_id
    )
    aggregate_future_id = fcall_aggregate_json_from_futures(
        vep_vcf=vep_vcf,
        pyclone_future_id=pyclone_future_id,
        spruce_future_id=spruce_future_id
    )
    return aggregate_future_id

Finally, the test workflow function calls the workflow on an example data file and waits for the output

In [11]:
def test_full_workflow():
    future_id = fcall_full_workflow(
        vep_vcf=test_files['vep_vcf']
    )
    AppFutureManager.query(future_id).result()

## Running the code

Parsl requires to load a configuration that specifies the computing resources available, and how are they going to be distributed among parsl apps. In this case, a ThreadPoolExecutor is used with 4 as the maximum number of threads. The run_dir is also specified, which corresponds to the directory where all *log* information will be saved.

In [12]:
def load_config():
    config = Config(
        executors=[
            ThreadPoolExecutor(
                label='threads',
                max_threads=4
            )
        ],
        run_dir=LOGS_DIR
    )
    parsl.load(config)

load_config() # Can be called only once

In [13]:
AppFutureManager.new_dir()
test_full_workflow()

<AppFuture at 0x7ff3cc581c90 state=pending>
<AppFuture at 0x7ff3cc4d79d0 state=pending>
<AppFuture at 0x7ff3cc504090 state=pending>
<AppFuture at 0x7ff3cc504f10 state=pending>
<AppFuture at 0x7ff3cc506c10 state=pending>


In [14]:
last_run_tree()

/home/alejo/Documents/NCSA/phyloflow_parsl/parsl/runs/2023-07-25_13:29:55
├── future_1_run_vcf_transform
│   ├── headers.json
│   ├── mutations.json
│   ├── pyclone_samples
│   │   └── A25.tsv
│   ├── pyclone_vi_formatted.tsv
│   ├── stderr.txt
│   └── stdout.txt
├── future_2_run_pyclone_vi
│   ├── cluster_assignment.tsv
│   ├── cluster_fit.hdf5
│   ├── stderr.txt
│   └── stdout.txt
├── future_3_run_cluster_transform
│   ├── spruce_formatted.tsv
│   ├── stderr.txt
│   └── stdout.txt
├── future_4_run_spruce_tree
│   ├── spruce.cliques
│   ├── spruce.merged.res
│   ├── spruce.res
│   ├── spruce.res.gz
│   ├── spruce.res.json
│   ├── spruce.res.txt
│   ├── stderr.txt
│   └── stdout.txt
└── future_5_run_aggregate_json
    ├── aggregated.json
    ├── stderr.txt
    └── stdout.txt

7 directories, 24 files


## Extending the functionality

The original workflow functionality was extended to process multiple input files.The output files of all workflows are concatenated using one last @python_app called aggregate_workflows

In [15]:
@python_app
def aggregate_workflows(inputs=[], outputs=[]):
    output_json = []
    for file in inputs:
        workflow_json = json.load(open(file))
        output_json.append(workflow_json)
    output_file = open(outputs[0], 'w')
    output_file.write(json.dumps(output_json))
    output_file.close()

def get_inputs_aggregate_workflows(aggregate_futures:List[AppFuture]):
    return [future.outputs[0] for future in aggregate_futures]

def run_aggregate_workflows(inputs:list, rundir):
    outputs = [
        'aggregated_workflows.json'
    ]
    outputs = format_files(rundir, outputs)
    aggregate_workflows_future = aggregate_workflows(inputs=inputs, outputs=outputs)
    return aggregate_workflows_future

The final parallel workflow implementation is as follows

In [16]:
def fcall_parallel_workflows(vep_vcf_files:list[str]):
    future_ids = []
    for vep_vcf in vep_vcf_files:
        future_id = fcall_full_workflow(
            vep_vcf=vep_vcf
        )
        future_ids.append(future_id)

    futures = [AppFutureManager.query(id) for id in future_ids]
    inputs = get_inputs_aggregate_workflows(futures)
    return fcall_execute(run_aggregate_workflows, inputs)

For demonstration purposes, the test function launches 3 workflows over the same input file

In [17]:
def test_parallel_workflows():
    future_id = fcall_parallel_workflows(
        vep_vcf_files=[test_files['vep_vcf']]*3
    )
    AppFutureManager.query(future_id).result()

In [18]:
AppFutureManager.new_dir()
test_parallel_workflows()

<AppFuture at 0x7ff3cc507f10 state=pending>
<AppFuture at 0x7ff3cc51e410 state=pending>
<AppFuture at 0x7ff3cc51c7d0 state=pending>
<AppFuture at 0x7ff3cc51d690 state=pending>
<AppFuture at 0x7ff3cc51edd0 state=pending>
<AppFuture at 0x7ff3cc3300d0 state=pending>
<AppFuture at 0x7ff3cc330910 state=pending>
<AppFuture at 0x7ff3cc331410 state=pending>
<AppFuture at 0x7ff3cc332310 state=pending>
<AppFuture at 0x7ff3cc333690 state=pending>
<AppFuture at 0x7ff3cc340e50 state=pending>
<AppFuture at 0x7ff3cc341f90 state=pending>
<AppFuture at 0x7ff3cc342e50 state=pending>
<AppFuture at 0x7ff3cc343690 state=pending>
<AppFuture at 0x7ff3cc34d390 state=pending>
<AppFuture at 0x7ff3cc34e210 state=pending>


# Docker Container

<img src="./Images/docker.png" height="100">

WDL is designed to run every single task in an independent container, so the original phyloflow had different docker images for every task. That is not the case for Parsl, however a workaround was to create multiple conda environments within the same docker container, so that parsl apps can execute the code within the corresponding environment.

Attached to the project is the dockerfile ([/parsl/docker/dockerfile](./parsl/docker/dockerfile)) that contains the definition to build a docker image with all the dependencies that the workflow needs, including the conda environments. The file [/parsl/docker/docker_commands.sh](./parsl/docker/docker_commands.sh) contains docker commands to build the image and run the container. 

The container has been tested on Linux and Mac Systems.

In [20]:
# Convert notebook to readme
! jupyter nbconvert documentation.ipynb --to markdown --output README.md

[NbConvertApp] Converting notebook documentation.ipynb to markdown
[NbConvertApp] Writing 15772 bytes to README.md
