# Flow Control

In [1]:
# Following code is needed to preconfigure this notebook
import datetime
import sys
import os
sys.path.insert(0, os.path.abspath('../../..'))

import pyflow as pf

scratchdir = os.path.join('/', 'path', 'to', 'scratch')
filesdir = os.path.join(scratchdir, 'files')
outdir = os.path.join(scratchdir, 'out')


class CourseSuite(pf.Suite):
    """
    This CourseSuite object will be used throughout the course to provide sensible
    defaults without verbosity
    """
    def __init__(self, name, **kwargs):
        
        config = {
            'host': pf.LocalHost(),
            'files': os.path.join(filesdir, name),
            'home': outdir,
            'defstatus': pf.state.suspended
        }
        config.update(kwargs)
        
        super().__init__(name, **config)


class MyTask(pf.Task):
    
    """Counts to the double of a number, first half using a for loop then a while loop"""
    
    def __init__(self, name, default_value=0, **kwargs):
        
        variables = {
            'HALF': default_value,
            'LIMIT': 2*default_value,
        }
        variables.update(**kwargs)
        
        labels = {
            'counter_label': 'count to {}'.format(2*default_value)
        }
        
        script = [
            'echo "This is a counting task named {}"'.format(name),
            'for i in $(seq 1 $HALF); do echo "count $i/$LIMIT"; done',
            'i=$[$HALF+1]; while [ $i -lt $LIMIT ]; do echo "count $i/$LIMIT" ; i=$[$i+1]; done'
        ]
        
        super().__init__(name,
                         script=script,
                         labels=labels,
                         **variables)

## Triggers

In [2]:
with pf.Suite('test') as s:
    with pf.Family('f1'):
        pf.Variable('SLEEP', 20)
        t1 = pf.Task('t1')
        t2 = pf.Task('t2')
        
    t1 >> t2

s

### Embedded Triggers

Trigger expressions can be embedded within the scripts using `--wait` child command. Whilst the expression is *not* true, the job will hold.

Where possible you should give preference to triggers on the definitions, since these are checked on creation, whereas embedded triggers are checked at run time.

In [3]:
with pf.Suite('test', host=pf.LocalHost(), files='/test') as s:
    with pf.Family('f1'):
        pf.Variable('SLEEP', 20)
        pf.Task('t1')
        pf.Task('t2', script='ecflow_client --wait="t1 == complete"')
    
s

In [4]:
s.deploy_suite(target=pf.Notebook)

## Events

Sometimes waiting for the completion of a task is not good enough. If a task is producing several results, another task may start as soon as the first results are ready. For that, **pyflow** introduces the concept of events. An event is a message that a task will report to **ecFlow** server while it is running.

Events have names and a task can set several of them.

In [5]:
with pf.Suite('test', host=pf.LocalHost(), files='/test') as s:
    with pf.Family('f1'):
        pf.Variable('SLEEP', 20)
        t1 = pf.Task('t1')
        with pf.Task('t2', script=[
            'echo "I will now sleep for %SLEEP% seconds"',
            'sleep %SLEEP%',
            'ecflow_client --event=a    # Set the first event',
            'sleep %SLEEP%              # Sleep a bit more',
            'ecflow_client --event=b    # Set the second event',
            'sleep %SLEEP%              # A last nap...',
        ]) as t2:
            pf.Event('a')
            pf.Event('b')
        t3 = pf.Task('t3')
        t4 = pf.Task('t4')

    t2.triggers = t1
    t3.triggers = t2.a
    t4.triggers = t2.b

s

In [6]:
s.deploy_suite(target=pf.Notebook)

## Complete

Sometimes a task should not be run when a certain condition is met. The condition can be signalled by an event. For example, event `t2.b` might indicate that task `t2` did not manage to produce the expected result, so we do not need to run task `t4`.

In this case, you can use the complete attribute. This has a similar usage to the trigger attribute but sets a task complete rather than running it.

When **ecFlow** server tries to start a task, it evaluates the trigger and complete expressions. If the complete expression condition is true, the task will set itself complete.

Completes can be between tasks, between families, or both. It can be used in conjunction with a trigger.

In [7]:
with pf.Suite('test', host=pf.LocalHost(), files='/test') as s:
    with pf.Family('f1'):
        pf.Variable('SLEEP', 20)
        t1 = pf.Task('t1')
        with pf.Task('t2', script=[
            'echo "I will now sleep for %SLEEP% seconds"',
            'sleep %SLEEP%',
            'ecflow_client --event=a    # Set the first event',
            'sleep %SLEEP%              # Sleep a bit more',
            'ecflow_client --event=b    # Set the second event',
            'sleep %SLEEP%              # A last nap...',
        ]) as t2:
            pf.Event('a')
            pf.Event('b')
        t3 = pf.Task('t3')
        t4 = pf.Task('t4')

    t2.triggers = t1
    t3.triggers = t2.a
    t4.completes = t2.b
    t4.triggers = t2

s

## Expressions in Triggers and Completes

**ecFlow** has a rich languge and (associated behaviour) for expressions that trigger dependencies and conditional behaviour in suites. These expressions are ultimately strings that are parsed by the **ecFlow** server and evaluated to control the suite.

Within **pyflow**, all of the components that make up **ecFlow** expressions are already present as objects in the script. This means we can generate type-safe, validated expressions by using the existing objects directly. These can then be assigned to the `triggers` or `completes` attributes of any appropriate node.

Trigger expressions should follow the natural arithmetic expressing the problem.

In [8]:
with pf.Suite('s') as s:
    
    with pf.Family('repeat1') as repeat1:
        pf.RepeatDate('YMD', datetime.date(2019, 1, 1), datetime.date(2019, 12, 31))
        
    with pf.Family('repeat2') as repeat2:
        pf.RepeatDate('YMD', datetime.date(2019, 1, 1), datetime.date(2019, 12, 31))
        
    repeat2.triggers = (repeat1 == pf.state.complete) | (repeat1.YMD > repeat2.YMD)
        
    pf.Task('t3').completes = (repeat2.YMD > '20190616')
    
s

### Shortcut properties

A number of shortcut properties exist to construct standard expression components. The following sets of examples are equivalent.

In [9]:
t = MyTask('a_task')
exprn = (t == pf.state.aborted)
exprn = (t == pf.state.complete)
exprn = (t == pf.state.unknown)
exprn = (t == pf.state.queued)
exprn = (t == pf.state.submitted)
exprn = (t == pf.state.active)

t = MyTask('a_task')
exprn = t.aborted
exprn = t.complete
exprn = t.unknown
exprn = t.queued
exprn = t.submitted
exprn = t.active

### Combined Expressions

Expressions can be combined with logical operators, both unary and binary.

In [10]:
with pf.Suite('s') as s:
    t1 = MyTask('t1')
    t2 = MyTask('t2')
    t3 = MyTask('t3')
    
    t1.triggers = t2.complete & t3.aborted
    
s

In [11]:
with pf.Suite('s') as s:
    t1 = MyTask('t1')
    t2 = MyTask('t2')
    t3 = MyTask('t3')
    
    t1.triggers = t2.complete
    t1.triggers |= t3.aborted
    
s

### Shortcut Dependencies

The most common trigger expression to express is one of dependencies. Task A runs only after Task B has completed. We provide a special operator to simplify this approach.

The following are equivalent approaches.

In [12]:
with pf.Suite('s') as s:
    t1 = MyTask('t1')
    t2 = MyTask('t2')
    t2.triggers = t1.complete
    
with pf.Suite('s') as s:
    t1 = MyTask('t1')
    t2 = MyTask('t2')
    t2.triggers = (t1 == pf.state.complete)
    
with pf.Suite('s') as s:
    t1 = MyTask('t1')
    t2 = MyTask('t2')
    t1 >> t2
    
with pf.Suite("s") as s:
    (
        MyTask('t1')
        >>
        MyTask('t2')
    )
    
s

## Looping Constructs

**pyflow** supports **ecFlow** looping constructs, and ensures that they are initialised in a type-safe manner. The values of these looping constructs can be accessed from scripts in the same manner as normal **ecFlow** variables.

In [13]:
class LabelSetter(pf.Task):
    
    def __init__(self, *args, **kwargs):
        """
        Accepts a sequence of label-value tuples
        """
        script = [
            pf.TemplateScript(
                'ecflow_client --alter=change label {{ LABEL.name }} "{{ VALUE }}" {{ LABEL.parent.fullname }}',
                LABEL=label, VALUE=value
            ) for label, value in args
        ]
        
        name = kwargs.pop('name', 'set_labels')
        super().__init__(name, script=script, **kwargs)


class WaitSeconds(pf.Task):
    def __init__(self, seconds, **kwargs):
        name = kwargs.pop('name', 'wait_{}'.format(seconds))
        super().__init__(name, script='sleep {}'.format(seconds), **kwargs)


with CourseSuite('looping_constructs') as s:
    
    with pf.Family('date_family'):
        pf.RepeatDate('REPEAT_DATE',
                      datetime.date(year=2019, month=1, day=1),
                      datetime.date(year=2019, month=12, day=31))
        
        with pf.Family('hour_family', labels={'date_time': ''}) as f:
            pf.RepeatInteger('REPEAT_HOUR', 1, 24)
            (
                LabelSetter((f.date_time, '$REPEAT_DATE hour $REPEAT_HOUR'))
                >>
                WaitSeconds(2)
            )

s

## External ecFlow Dependencies

**pyflow** builds its dependency trees using python objects. This means that if we wish to have connections to external suites, that are not built from the same repository, then we must build shadow objects that map to the nodes we wish to connect to.

A full range of these `Extern*` objects exist which may be used in the normal way.

In [14]:
with pf.Suite('s') as s:
    
    etask = pf.ExternTask('/a/b/c/d')
    efamily = pf.ExternFamily('/f/g/h/i')
    
    eymd = pf.ExternYMD('/a/b/c/d:YMD')
    eevent = pf.ExternEvent('/e/f/g/h:ev')
    emeter = pf.ExternMeter('/g/h/i/j:mt')
    
    t1 = pf.Task('t1')
    t1.triggers = etask & efamily
    
s