# Flow

A workflow object. Manages its own logging and execution or cna be passed on to a scheduler that can take care.

# Prelims

In [1]:
# Black Codeformatter
%load_ext lab_black

## Imports

In [2]:
import time

import affe
from affe.io import (
    mimic_fs,
    tree_path,
    get_root_directory,
    get_flow_directory,
    abspath,
    get_children,
    get_children_paths,
    get_subtree,
    rename_directory,
    insert_subdirectory,
    get_code_string,
    get_filename,
    get_filepath,
    dump_object,
    check_existence_of_directory,
)

from affe.utils import flatten_dict, keychain

# Basic Idea

## Implementation

In [3]:
from affe.flow import Flow, load_flow

## Test

In [4]:
root_dir = get_root_directory()
exp_dir = get_flow_directory(keyword="flow")
root_dir, exp_dir

my_fs = insert_subdirectory(root_dir, parent="out", child=exp_dir,)
my_fs

{'root': '/home/zissou/repos/affe',
 'cli': 'root',
 'data': 'root',
 'out': 'root',
 'scripts': 'root',
 'out.flow.config': 'out.flow',
 'out.flow.logs': 'out.flow',
 'out.flow.results': 'out.flow',
 'out.flow.timings': 'out.flow',
 'out.flow.tmp': 'out.flow',
 'out.flow': 'out'}

In [5]:
dummy_config = dict(io=dict(fs=my_fs))
dummy_config

{'io': {'fs': {'root': '/home/zissou/repos/affe',
   'cli': 'root',
   'data': 'root',
   'out': 'root',
   'scripts': 'root',
   'out.flow.config': 'out.flow',
   'out.flow.logs': 'out.flow',
   'out.flow.results': 'out.flow',
   'out.flow.timings': 'out.flow',
   'out.flow.tmp': 'out.flow',
   'out.flow': 'out'}}}

In [6]:
def dummy_imports():
    import affe
    from affe.io import (
        mimic_fs,
        tree_path,
        get_root_directory,
        get_flow_directory,
        abspath,
        get_children,
        get_children_paths,
        get_subtree,
        rename_directory,
        insert_subdirectory,
        get_code_string,
        get_filename,
        get_filepath,
        dump_object,
        check_existence_of_directory,
    )

    from affe.utils import flatten_dict, keychain

    import time

    print("Imports succesful")

    return


def dummy_flow(config):
    print("Hello world")

    fs = config.get("io").get("fs")
    results_directory_key = "out.flow.results"

    fn_results = abspath(fs, results_directory_key, filename="demo.json")

    check_existence_of_directory(fs, results_directory_key)

    results = dict(elia="cool")

    dump_object(results, fn_results)

    sleep_a_few_s = 2
    time.sleep(sleep_a_few_s)
    print("{} secs passed".format(sleep_a_few_s))

    return True

In [7]:
logs_directory_key = "out.flow.logs"
log_filepath = abspath(my_fs, logs_directory_key, "logfile")
check_existence_of_directory(my_fs, logs_directory_key)


f = Flow(
    config=dummy_config,
    imports=dummy_imports,
    flow=dummy_flow,
    timeout_s=10,
    log_filepath=log_filepath,
)

In [8]:
f.run()

Hello world
2 secs passed


True

In [9]:
f.run_with_log()

In [10]:
f.dump()

In [11]:
flow_fn = f.flow_filepath
flow_fn

'flowfile.pkl'

In [12]:
fl = load_flow(flow_fn)

In [13]:
fl.run()

Hello world
2 secs passed


True

In [14]:
fl.run_with_imports()

Imports succesful
Hello world
2 secs passed


In [15]:
fl.run_with_log()

# Script

A flow can output a bash command that will run itself. This involves dumping itself and running a standard script.

## Implementation

In [16]:
from affe.cli import get_flow_cli

In [17]:
script = get_flow_cli()
script

'/home/zissou/repos/affe/src/affe/cli/flow_cli.py'

In [18]:
get_flow_cli(abs=False)

'../../src/affe/cli/flow_cli.py'

In [19]:
flow_fn

'flowfile.pkl'

## Test

In [20]:
import os

here = os.getcwd()
here

'/home/zissou/repos/affe/note/impl'

In [21]:
%%bash -s "$here" "$script" "$flow_fn"

cd $1
source activate affe
pwd
python $2 -f $3

/home/zissou/repos/affe/note/impl
Imports succesful
Hello world
2 secs passed

    I am running the general-purpose flow script.
    



# Template Metaprogramming

In order to be able to programmatically pass on the imports, some minor metaprogramming is necessary.

## Tests

In [22]:
def dummy_imports():
    import affe
    from affe.io import (
        mimic_fs,
        tree_path,
        get_root_directory,
        get_flow_directory,
        abspath,
        get_children,
        get_children_paths,
        get_subtree,
        rename_directory,
        insert_subdirectory,
        get_code_string,
        get_filename,
        get_filepath,
        dump_object,
        check_existence_of_directory,
    )

    from affe.utils import flatten_dict, keychain

    import time

    print("Imports succesful.")

    return


def dummy_flow(config):
    print("Hello world")

    fs = config.get("io").get("fs")
    results_directory_key = "out.flow.results"

    fn_results = abspath(fs, results_directory_key, filename="demo.json")

    check_existence_of_directory(fs, results_directory_key)

    results = dict(elia="cool", script="bigsucces")

    dump_object(results, fn_results)

    sleep_a_few_s = 2
    time.sleep(sleep_a_few_s)
    print("{} secs passed".format(sleep_a_few_s))

    return True


logs_directory_key = "out.flow.logs"
log_filepath = abspath(my_fs, logs_directory_key, "logfile-with-script")
check_existence_of_directory(my_fs, logs_directory_key)


f2 = Flow(
    config=dummy_config,
    imports=dummy_imports,
    flow=dummy_flow,
    timeout_s=10,
    log_filepath=log_filepath,
)

In [23]:
f2.log_filepath

'/home/zissou/repos/affe/out/flow/logs/logfile-with-script'

In [24]:
f2.run_imports()

Imports succesful.


amazing, lgtm

In [25]:
f2.dump("yolo.pkl")

In [26]:
f3 = load_flow("yolo.pkl")

In [27]:
f3.run_with_imports()

Imports succesful.
Hello world
2 secs passed


In [28]:
f3.get_cli_command()

['/home/zissou/miniconda3/envs/affe/bin/python',
 '/home/zissou/repos/affe/src/affe/cli/flow_cli.py',
 '-f',
 'yolo.pkl']

In [29]:
import sys

sys.executable

'/home/zissou/miniconda3/envs/affe/bin/python'

In [30]:
f3.run_via_shell_with_log()

In [33]:
c = f3.get_cli_command_with_logs(return_list=False)
print(c)

/home/zissou/miniconda3/envs/affe/bin/python /home/zissou/repos/affe/src/affe/cli/flow_cli_with_monitors.py -f yolo.pkl -l /home/zissou/repos/affe/out/flow/logs/logfile-with-script -t 10


In [37]:
%%bash -s "$here" "$c"

$2


    Done running the general-purpose monitored flow script.
    



In [32]:
%debug

ERROR:root:No traceback has been produced, nothing to debug.
