In [1]:
import asyncio

# orchestration engine

Let's consider a simple coroutine for the sake of illustration

In [2]:
import time

async def mycoro(timeout):
    print("-> {} mycoro({})".format(time.strftime("%H:%M:%S"), timeout))
    await asyncio.sleep(timeout)
    print("<- {} mycoro({})".format(time.strftime("%H:%M:%S"), timeout))
    # return something easy to recognize
    return 1000 * timeout

# example A

Running a series of coroutines in parallel - a la `gather` - can be done like this

In [3]:
from job import Job
from engine import Engine

In [4]:
a1, a2, a3 = Job(mycoro(1)), Job(mycoro(2)), Job(mycoro(1.5)),

What we're saying here is that we have three jobs, that have no relationships between them. 

So when we run them, we would start all 3 coroutines at once, and return once they are all done:

In [5]:
ea = Engine(a1, a2, a3)
ea.orchestrate()

-> 16:50:40 mycoro(1.5)
-> 16:50:40 mycoro(1)
-> 16:50:40 mycoro(2)
<- 16:50:41 mycoro(1)
<- 16:50:42 mycoro(1.5)
<- 16:50:42 mycoro(2)


True

# example B : add requirements (dependencies)

Now we can add *requirements* dependencies between jobs, like in the following example. We take this chance to show that jobs can be tagged with a label, which can turn out te be convenient somtimes.

In [6]:
b1, b2, b3 = (Job(mycoro(1), label="b1"),
              Job(mycoro(2)), 
              Job(mycoro(1.5), label="b3"))

b3.requires(b1)

Now `b3` needs `b1` to be finished before it can start. And so only the 2 first coroutines get started at the beginning, and only once b1 has finished does b3 start.

In [7]:
eb = Engine(b1, b2, b3)

In [8]:
eb.orchestrate()

-> 16:50:42 mycoro(2)
-> 16:50:42 mycoro(1)
<- 16:50:43 mycoro(1)
-> 16:50:43 mycoro(1.5)
<- 16:50:44 mycoro(2)
<- 16:50:45 mycoro(1.5)


True

# inspect results

Before we see more examples, let's see how details for each `Job` can be retrieved once `orchestrate` finishes:

In [9]:
# a shorter equivalent form would be 
# e2.list()
 
for job in eb.jobs:
    print(job)

<Job `NOLABEL' finished -> 2000>
<Job `b3' finished -> 1500.0 - [requires [b1]]>
<Job `b1' finished -> 1000 - [allows [b3]]>


In [10]:
print(b1.is_done())

True


In [11]:
print(b3.result())

1500.0


# example C : infinite loops, or coroutines that don't return

Sometimes it is useful to deal with a endless loop; for example if we want to separate completely actions and printing, we can use an `asyncio.Queue` to implement a simple message bus as follows

In [12]:
message_bus = asyncio.Queue()

async def monitor_loop(bus):
    while True:
        message = await bus.get()
        print("{} BUS: {}".format(time.strftime("%H:%M:%S"), message))

Now we need a modified version of the previous coroutine, that interacts with this message bus instead of printing anything itself&nbsp;:

In [13]:
async def mycoro_bus(timeout, bus):
    await bus.put("-> mycoro({})".format(timeout))
    await asyncio.sleep(timeout)
    await bus.put("<- mycoro({})".format(timeout))
    # return something easy to recognize
    return 10 * timeout

We can replay the prevous scenario, adding the monitoring loop as a separate job; however we need to declare this job with `forever=True` so that we know when the bulk of the scenario is completed, since the monitoring loop will never return.

In [14]:
c1, c2, c3, c4 = (Job(mycoro_bus(0.4, message_bus), label="c1"),
                  Job(mycoro_bus(0.8, message_bus), label="c2"), 
                  Job(mycoro_bus(0.6, message_bus), label="c3"),
                  Job(monitor_loop(message_bus), forever=True, label="monitor"))

c3.requires(c1)

ec = Engine(c1, c2, c3, c4)
ec.orchestrate()

16:50:45 BUS -> -> mycoro(0.8)
16:50:45 BUS -> -> mycoro(0.4)
16:50:45 BUS -> <- mycoro(0.4)
16:50:45 BUS -> -> mycoro(0.6)
16:50:46 BUS -> <- mycoro(0.8)
16:50:46 BUS -> <- mycoro(0.6)


True

Note that `orchestrate` always terminates as soon as all the non-`forever` jobs are complete. The `forever` jobs, on the other hand, get cancelled, so of course no return value is available at the end of the scenario&nbsp;:

In [15]:
ec.list()

<Job `monitor'[∞] cancelled>
<Job `c2' finished -> 8.0>
<Job `c3' finished -> 6.0 - [requires [c1]]>
<Job `c1' finished -> 4.0 - [allows [c3]]>


# customizing the `Job` class

`Job` actually is a specializtion of `AbstractJob`, and the specification is that the `corun()` method should denote a coroutine itself, and that is what is triggered by `Engine` for running said job.

You can define your own `Job` class by specializing `job.AbstractJob` - more on this later, we'll define some predefined jobs, in particular for interacting through ssh, and possibly many others.

# TODO

This totally is only a seed at this point, like day d+1 

## deal with exceptions
* for now we kind of assume that `corun()` does not trigger an exception. This needs to be robustified. 

## termination

1. `orchestrate` should accept a timeout argument
1. provide a means to shutdown jobs once the engine has run out. Typically we would have several jobs using the same ssh connection, and these need to be closed at some point. Something like `Engine.shutdown` sending `Job.coshutdown()`, or similar

  Would it work to just send e.g. `coshutdown()` on all jobs ? this needs a little more thinking though; that could mean trying to shutdown an ssh connection from several points at the same time ...


## monitoring 
* come up with some basic (curses ?) monitor to show what's going on; what I have in mind is something like rhubarbe load where all jobs would be displayed, one line each, and their status could be shown so that one can get a sense of what is going on
* one way to look at this is to have the main Engine class send itself a `tick()` method, and then specialize `Engine` as `EngineCurses` that would actually do things on such events.
* ***or*** this gets delegated on a `message_queue` object. **Review the rhubarbe code on this aspect**.
