# Demo workflow

Let's look at how covalent can be used when you already have certain tasks and want to bring them together into one workflow.

In [1]:
import covalent as ct

@ct.electron
def add_large_num(num_1, num_2):
    # Add two large numbers
    
    print(f"Adding {num_1} and {num_2}")
    
    return num_1 + num_2


@ct.electron
def multiply_large_num(num_1, num_2):
    # Multiply two large numbers

    print(f"Multiplying {num_1} and {num_2}")

    return num_1 * num_2

@ct.electron
def square_large_num(num):
    # Square a large number

    print(f"Squaring {num}")

    return num ** 2


@ct.lattice
def sample_workflow(large_num_1, large_num_2, large_num_3):
    # Sample workflow to do 3 independent jobs and 1 dependent job

    added_num = add_large_num(large_num_1, large_num_2)
    multiplied_num = multiply_large_num(large_num_1, large_num_2)

    square_large_num(large_num_3)

    return add_large_num(added_num, multiplied_num)
    

## Execution without covalent

Even if you've decorated these functions, they are still executable as normal python functions.

In [2]:
l1 = 1234567
l2 = 7654321
l3 = 12321

sample_workflow(l1, l2, l3)

Adding 1234567 and 7654321
Multiplying 1234567 and 7654321
Squaring 12321
Adding 8888888 and 9449772114007


9449781002895

## Drawing the graph

Suppose you want to see how the workflow graph looks like without executing the tasks:

In [3]:
sample_workflow.draw(l1, l2, l3)

You can go over to [Covalent UI](localhost:48008) and click on the preview tab on your left to check out the workflow graph.

## Dispatching the workflow

Let's dispatch the workflow to covalent server and check its realtime status in the UI.

In [4]:
dispatch_id = ct.dispatch(sample_workflow)(l1, l2, l3)

## Getting the result back

Using the `dispatch_id` you received you can get the result of any execution and see information ranging from its status, each task's output, start and end times to error if any occured.

In [5]:
sample_workflow_result = ct.get_result(dispatch_id)
print(sample_workflow_result)


Lattice Result
status: COMPLETED
result: 9449781002895
inputs: {'args': [1234567, 7654321, 12321], 'kwargs': {}}
error: None

start_time: 2022-06-10 13:47:40.173737+00:00
end_time: 2022-06-10 13:47:40.451817+00:00

results_dir: /Users/sankalpsanand/dev/tutorials_covalent_mlops_2022/results
dispatch_id: e387be5b-d731-447c-893e-f520c5dc71dc

Node Outputs
------------
add_large_num(0): 8888888
:parameter:1234567(1): 1234567
:parameter:7654321(2): 7654321
multiply_large_num(3): 9449772114007
:parameter:1234567(4): 1234567
:parameter:7654321(5): 7654321
square_large_num(6): 151807041
:parameter:12321(7): 12321
add_large_num(8): 9449781002895



## Executors

Executors are responsible for running your task on a particular backend of your choice. Let's start with a simple `DaskExecutor` which will run your task on a Dask cluster. You can use your custom executors by just overriding one function - [How to create your own executor](https://covalent.readthedocs.io/en/latest/how_to/execution/creating_custom_executors.html).

### Workflow level assignment
In this case we'll use the executor for the whole workflow (`lattice`) so each electron inside it will be submitted to the running Dask cluster.

In [6]:
dask_executor = ct.executor.DaskExecutor(scheduler_address='tcp://127.0.0.1:55201')

@ct.lattice(executor=dask_executor)
def sample_workflow(large_num_1, large_num_2, large_num_3):
    # Sample workflow to do 2 independent jobs and 1 dependent job

    added_num = add_large_num(large_num_1, large_num_2)
    multiplied_num = multiply_large_num(large_num_1, large_num_2)

    square_large_num(large_num_3)

    return add_large_num(added_num, multiplied_num)

In [7]:
dispatch_id = ct.dispatch(sample_workflow)(l1, l2, l3)
print(dispatch_id)

48463d1f-f18f-4fa2-8289-6d2292ab0ee8


### Task level assignment

Now let's suppose we want to run all of these different tasks on different hardware backends. We can do that by creating the respective `Executor` objects. To start with we currently provide `SlurmExecutor` - task will be run as a SLURM job and `SSHExecutor` - task will be run on a remote machine connected through ssh.

In [8]:
slurm_executor = ct.executor.SlurmExecutor(
    username="sankalp",
    address="beehive.agnostiq.ai",
    remote_workdir="/federation/sankalp/workdir",
    ssh_key_file="/Users/sankalpsanand/.ssh/beehive",
    poll_freq=10,
    options={
        "partition": "debug", "ntasks": 1, "cpus-per-task": 1,
        "chdir": "/federation/sankalp/workdir", "nodelist": "beehive-debug-st-t2medium-1"},
)

ssh_executor = ct.executor.SSHExecutor(
    username="sankalp",
    hostname="beehive.agnostiq.ai",
    remote_dir="/federation/sankalp/workdir",
    ssh_key_file="/Users/sankalpsanand/.ssh/beehive",
)

Let's assign these executors to the different electrons as desired.

In [9]:
@ct.electron(executor=ssh_executor)
def add_large_num(num_1, num_2):
    # Add two large numbers
    
    print(f"Adding {num_1} and {num_2}")
    
    return num_1 + num_2


@ct.electron(executor=dask_executor)
def multiply_large_num(num_1, num_2):
    # Multiply two large numbers

    print(f"Multiplying {num_1} and {num_2}")

    return num_1 * num_2

@ct.electron(executor=slurm_executor)
def square_large_num(num):
    # Square a large number

    print(f"Squaring {num}")

    return num ** 2


@ct.lattice
def sample_workflow(large_num_1, large_num_2, large_num_3):
    # Sample workflow to do 2 independent jobs and 1 dependent job

    added_num = add_large_num(large_num_1, large_num_2)
    multiplied_num = multiply_large_num(large_num_1, large_num_2)

    square_large_num(large_num_3)

    return add_large_num(added_num, multiplied_num)

l1 = 1234567
l2 = 7654321
l3 = 12321

dispatch_id = ct.dispatch(sample_workflow)(l1, l2, l3)
    