# An Intro to Workflow Orchestration with Prefect
* Set of tools, that schedule and monitor the work:
    * E.g. Machine Learning Pipeline, that we want to run every week, we put it on a schedule and if it fails we want to be able to see the issues that occured. There are a lot of places where a MAchine Learning Pipeline can fail. 
    * Workflow orchestration shall help us to deal with these failures.
    ![prefect](prefect_01.png)
    

## Negative Engineering
90% of engineering time is speend on
* Retries when APIs go down
* Malformed Data
* Notifications
* Observability into failures
* Condictional failure logic
* Timeouts

Prefect aims to reduce this time!
Workflow orchestration is a set of features that help to reduce the time spend onthe above points.

## Introduction to Prefect
Eliminating negative engineering

* open source workflow orchestration
* Python-based
* Modern data stack
* Native dask integration
* Very active community
* Prefect cloud/ Prefect server
* Prefect Orion (Prefect 2.0) (currently beta-version available)

* Basic example of the usage of Prefect:
![prefect](prefect_02.png)

**Next step: use the notebook from the previous week and convert it to a script, so that we can deploy it.**
* We will use Prefect 2.0 for this tutorial! This needs to be explicitely installed: ```prefect==2.0b5```
* Use ```requirements.txt```

# First Prefect Flow and Basic Concepts
* It is quite normal that model accuracy drops over time, this is called *model drift*
* To avoid this we can regularly retrain our model

![model_drift](model_drift.png)

* We already have a script that trains a model
* Now we would like to put this on a schedule
* Before putting the new model into production, we would like to compare it -> We can use mlflow for that
* Install prefect: ```pip install prefect==2.0b5```
* We now bring our training script to prefect. 
    * For that do ```from prefect import flow``` and add the decorator ```@flow``` around the ```main``` function
    * Next: ```from prefect import flow, task``` and add the decorator ```@task``` around the ```add_features``` function
    * This makes ```add_features``` a future and we need to add ```.result()```, when we call it
* Using ```prefect orion start``` gives us the URL to a dashboard:
```
INFO:     Started server process [161608]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:4200 (Press CTRL+C to quit)
```
![prefect start](prefect_start.png)

* Each time we start a flow run, prefect updates its state to the API
    * All the logs from the runs are visible through the API
* "Tasks are the unit of monitoring", each task is observed by prefect

# Remote Prefect Orion Deployment

* Instructions: Hosting an Orion instance on a cloud VM: https://discourse.prefect.io/t/hosting-an-orion-instance-on-a-cloud-vm/967
    * Start an instance/VM (as shown in week 01)
    * Add necessary security
        * Edit Inbound Rules: 
            * add http from everywhere (source: anywhere)
            * add custom TCP, port range 4200 from everywhere (source: anywhere)
            * add custom UDP, port range 4200 from everywhere (source: anywhere)
            * add HTTPS from everywhere (source: anywhere)
    * Go back to the instance and refresh
    * ssh to this instance via the terminal: ```ssh -i <key> <name>@<ip>```
    * Install necessary packages: ```pip install prefect==2.0b5```
    * Set the UI_API_URL with : ```prefect config set PREFECT_ORION_UI_API_URL="http://<external-ip>:4200/api"```
    * Start Orion with: ```prefect orion start --host 0.0.0.0```
    * From local machine, configure to hit the API with: ```prefect config set PREFECT_API_URL="http://<external-ip>:4200/api"```
    * The remote UI will be visible on :4200/
* use ```prefect config view``` to check the configurations
* use ```prefect config unset``` to unset the configurations (e.g. in case we made a mistake ...) 

# Deploy a prefect flow
* storage:
    * Our flows are stored somewhere
    * We have to define a storage where our flows are saved
    * Check the storage with ```prefect storage ls``` 
    ![prefect storage](prefect_storage.png)
    * Create a new storage: ```prefect storage create```
    ![prefect choose storage](prefect_storage_1.png)
    * Choose the storage. In this example choose ```3``` for local storage
    * Filepath: ```<HOME>/.prefect```
    * Name: local
* Add a deployment to our training script and save it as ```prefect_desploy.py```:
```
from prefect.deployments import DeploymentSpec
from prefect.orion.schemas.schedules import IntervalSchedule
from prefect.flow_runner import SubprocessFlowRunner
from datetime import timedelta

    # define deploymentspec
DeploymentSpec(
    flow=main,
    name="model_training",
    schedule=IntervalSchedule(interval=timedelta(minutes=5)), #here in practice we would put 1 day, 1 week,...
    flow_runner=SubprocessFlowRunner(),
    tags=["ml"]
)
```
* In terminal: ```prefect deployment create prefect_deploy.py```
* In terminal: ```prefect orion start```
![deployment](deployment.png)
* We can see our deployment, but the prefect server is not doing any of the compute, we need to specify where the training should run. The mechanism for this are agents and work queues.
![work queues](workqueues.png)
* create a workqueue
![create work queue](create_workqueue.png)
* We get a workflow id, which we can use, e.g.
    * ```prefect work-queue preview cb95d191-d549-461d-87b0-9e1d3e298126``` to see all the scheduled runs:
     ![workqueue_preview](workqueue_preview.png)
* Now we spin up an agent. An agend looks for work that is to do: ```prefect agent start cb95d191-d549-461d-87b0-9e1d3e298126```
