<img src="img/prefect-logo.png"/>

# Getting Started with Prefect
Data workflow orchestrators are used to coordinate, monitor and observe data movement. Features that make data-pipelines fault-tolerant include:
- scheduling and triggering jobs
- adding retries
- dependency and state depedencies ('if the previous job failed')
- caching expensive tasks
- deploying flows to different environment 
- visibility into execution state of all jobs in your workflows

Stop safeguarding against failure! Instead, enjoy a single pane of glass for monitoring your code. 

Let's gain visibility into our Python scripts with Prefect. All we have to do for observability, scheduling and more is to add a single `@flow` decorator (and optionally `@task` decorators). After that, we'll also create a deployment, view it in the UI, add a schedule, and watch our code run in the Prefect UI. 

## Prerequisites
[Use the README to clone the repo and install Jupyter notebook](./README.md)

## Demo agenda
1. We will install dependencies.
2. We will take a look at our first flow (Python code).
3. We will run that flow locally.
4. We will then view our flow run in the UI. 
5. We will deploy our code so that we don't need to run it locally anymore.
6. We will add a schedule to our code. 
7. We will start a local agent that can execute our code on our schedule.

## 1. First, we'll install Prefect and some other dependencies our script has to run

In [1]:
%%capture 
!pip install prefect==2.3.1 prefect-dask sklearn pandas
# The magic capture cmd simply suppresses the install output, which is lengthy.

## 2. Our first flow
Let's write a flow that takes data from the Titanic dataset and sees which model most accurately predicts the ship survivors. Read through the code in the cell below, noting that almost the only difference is the addition of `@task` and `@flow` decorators.

In [1]:
from ast import Str
from datetime import timedelta
from typing import Any, Dict, List

import pandas as pd
from pandas import DataFrame, Series
from prefect import flow, tags, task
from prefect.tasks import task_input_hash
from prefect_dask.task_runners import DaskTaskRunner
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier


@task(name="create-data", description="This task reads in and wrangles Titanic data")
def create_data():
    """
    Task that reads in data from the CSV, then cleans it. Easily extensible to other
    methods of data extraction, from an S3 bucket, API, etc.
    """
    df = pd.read_csv("titanic.csv")
    df = df.drop(["Name"], axis=1)
    df["Sex"] = pd.factorize(df["Sex"])[0]
    y = df["Survived"]
    X = df.drop("Survived", axis=1)
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    fill_age = X_train["Age"].mean()
    X_train["Age"] = X_train["Age"].fillna(fill_age)
    X_test["Age"] = X_test["Age"].fillna(fill_age)
    return X_train, X_test, y_train, y_test


@task(name="get-models", description="Retrieve models with hyperparams", retries=2)
def get_models(n_estimators=200) -> List:
    """
    A task that retrieves the models to be used. We have hard-coded them,
    but you can specify other models!
    """
    return [
        LogisticRegression(random_state=42),
        KNeighborsClassifier(),
        DecisionTreeClassifier(),
        SVC(),
        RandomForestClassifier(n_estimators=n_estimators, max_depth=4, random_state=42),
        RandomForestClassifier(n_estimators=100, max_depth=3, random_state=42),
    ]


@task(name="train-models", description="Use models to train and predict with")
def train_model(
    model: Any, X_train: DataFrame, X_test: DataFrame, y_train: Series, y_test: Series
) -> Dict:
    """
    This task allows us to use some sklearn to easily train a variety of models
    and output an accuracy score and the params used by that model.
    """
    clf = model.fit(X_train, y_train)
    y_pred = clf.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    return {
        "model": model.__class__.__name__,
        "params": model.get_params(),
        "accuracy": acc,
    }


@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def get_results(results: Dict) -> Str:
    """
    In this task we have added specification to cache the results
    if they are unchanged. The cache expires in 1 day, as we specified.
    """
    res = pd.DataFrame(results)
    return res


@flow(name="my-first-ml-flow", task_runner=DaskTaskRunner())
def my_first_ml_flow(n_estimators=200):
    """
    This flow will allow us to run all of the tasks that we defined above
    as we call them within this function.
    """
    with tags("dev"):  # optional tag specification
        # call each task
        X_train, X_test, y_train, y_test = create_data()
        models = get_models(n_estimators)
        training_runs = [
            train_model(model, X_train, X_test, y_train, y_test) for model in models
        ]
        model_results = get_results(training_runs)

        return model_results.head()

## 3. Let's run the flow to see the logs that are generated from this run

In [2]:
my_first_ml_flow()

14:05:53.607 | INFO    | prefect.engine - Created flow run 'whispering-oriole' for flow 'my-first-ml-flow'
14:05:53.608 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
14:05:54.796 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status
14:05:55.012 | INFO    | Flow run 'whispering-oriole' - Created task run 'create-data-aa7d62a6-0' for task 'create-data'
14:05:55.013 | INFO    | Flow run 'whispering-oriole' - Executing 'create-data-aa7d62a6-0' immediately...
14:05:55.061 | INFO    | Task run 'create-data-aa7d62a6-0' - Finished in state Completed()
14:05:55.075 | INFO    | Flow run 'whispering-oriole' - Created task run 'get-models-e133abf5-0' for task 'get-models'
14:05:55.076 | INFO    | Flow run 'whispering-oriole' - Executing 'get-models-e133abf5-0' immediately...
14:05:55.130 | INFO    | Task run 'get-models-e133abf5-0' - Finished in state Completed()
14:05:55.152 | INFO 

Unnamed: 0,model,params,accuracy
0,LogisticRegression,"{'C': 1.0, 'class_weight': None, 'dual': False...",0.752809
1,KNeighborsClassifier,"{'algorithm': 'auto', 'leaf_size': 30, 'metric...",0.691011
2,DecisionTreeClassifier,"{'ccp_alpha': 0.0, 'class_weight': None, 'crit...",0.735955
3,SVC,"{'C': 1.0, 'break_ties': False, 'cache_size': ...",0.651685
4,RandomForestClassifier,"{'bootstrap': True, 'ccp_alpha': 0.0, 'class_w...",0.792135


As we can see, info logs were generated about this flow run. We also have a flow run name! For example, your flow run name might be 'practical peacock`. Each flow run name is unique. But now let's kick thing up a notch.

## 4. View your flow run in the UI
1. Copy this command: `prefect orion start`, which will allow you to start an Orion server, enabling you to view your flow!
2. Visit the `Jupyter Home Page` tab in your browser. Click "NEW" in the right-hand corner and choose "TERMINAL". 
3. This will open a new terminal window. Paste the command.
4. Visit http://127.0.0.1:4200. In the `Flow Runs` tab, you should see your flow run name (e.g. 'practical peacock'), which you can click on view its logs, task runs, parameters, execution state, and more.

<img src="img/viewing-flow-run.png" style="width: 900px;"/>

## 5. Create a deployment
Now that you have run a flow and viewed it in the UI, let's create a deployment. Deploying your flow means you no longer need to call the function, flow or .py file locally to run your code. With a single command, any script you write in Python can be executed regularly while enabling you to be notified of failures and observe the state of your jobs at all times.

Because deployments are created from .py files (and we're in a .ipynb), we'll need to write our flow out to a file first by executing the code below:

In [1]:
def write_flow_to_file(filename: str="prefect_ml.py"):
    """
    This step is only necessary when creating a Prefect deployment in a Jupyter notebook.
    Prefect deployments are built from .py files. So we'll write our code out to filename.py
    """
    
    with open(filename, "w") as f:
        f.write('''\
    from ast import Str
    from datetime import timedelta
    from typing import Any, Dict, List

    import pandas as pd
    from pandas import DataFrame, Series
    from prefect import flow, tags, task
    from prefect.tasks import task_input_hash
    from prefect_dask.task_runners import DaskTaskRunner
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.metrics import accuracy_score
    from sklearn.model_selection import train_test_split
    from sklearn.neighbors import KNeighborsClassifier
    from sklearn.svm import SVC
    from sklearn.tree import DecisionTreeClassifier


    @task(name="create-data", description="This task reads in and wrangles Titanic data")
    def create_data():
        """
        Task that reads in data from the CSV, then cleans it. Easily extensible to other
        methods of data extraction, from an S3 bucket, API, etc.
        """
        df = pd.read_csv("titanic.csv")
        df = df.drop(["Name"], axis=1)
        df["Sex"] = pd.factorize(df["Sex"])[0]
        y = df["Survived"]
        X = df.drop("Survived", axis=1)
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        fill_age = X_train["Age"].mean()
        X_train["Age"] = X_train["Age"].fillna(fill_age)
        X_test["Age"] = X_test["Age"].fillna(fill_age)
        return X_train, X_test, y_train, y_test


    @task(name="get-models", description="Retrieve models with hyperparams", retries=2)
    def get_models(n_estimators=200) -> List:
        """
        A task that retrieves the models to be used. We have hard-coded them,
        but you can specify other models!
        """
        return [
            LogisticRegression(random_state=42),
            KNeighborsClassifier(),
            DecisionTreeClassifier(),
            SVC(),
            RandomForestClassifier(n_estimators=n_estimators, max_depth=4, random_state=42),
            RandomForestClassifier(n_estimators=100, max_depth=3, random_state=42),
        ]


    @task(name="train-models", description="Use models to train and predict with")
    def train_model(
        model: Any, X_train: DataFrame, X_test: DataFrame, y_train: Series, y_test: Series
    ) -> Dict:
        """
        This task allows us to use some sklearn to easily train a variety of models
        and output an accuracy score and the params used by that model.
        """
        clf = model.fit(X_train, y_train)
        y_pred = clf.predict(X_test)
        acc = accuracy_score(y_test, y_pred)
        return {
            "model": model.__class__.__name__,
            "params": model.get_params(),
            "accuracy": acc,
        }


    @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
    def get_results(results: Dict) -> Str:
        """
        In this task we have added specification to cache the results
        if they are unchanged. The cache expires in 1 day, as we specified.
        """
        res = pd.DataFrame(results)
        return res


    @flow(name="my-first-ml-flow", task_runner=DaskTaskRunner())
    def my_first_ml_flow(n_estimators=200):
        """
        This flow will allow us to run all of the tasks that we defined above
        as we call them within this function.
        """
        with tags("dev"):  # optional tag specification
            # call each task
            X_train, X_test, y_train, y_test = create_data()
            models = get_models(n_estimators)
            training_runs = [
                train_model(model, X_train, X_test, y_train, y_test) for model in models
            ]
            model_results = get_results(training_runs)

            print(model_results.head())


    if __name__ == "__main__":
        my_first_ml_flow()
        ''')
    print(f"Prefect flow written out to {filename}.")
        
write_flow_to_file("prefect_ml.py")

Prefect flow written out to prefect_ml.py.


## Now we're ready to build and deploy in one command!
Run the following cell, which will build and apply a deployment:

In [4]:
%%bash
prefect deployment build prefect_ml:my_first_ml_flow --name "ML Flow Deployment" --apply --skip-upload

Found flow 'my-first-ml-flow'
Deployment YAML created at 
'/Users/bean/Documents/prefect-in-jupyter-notebook/my_first_ml_flow-deployment.y
aml'.
Deployment 'my-first-ml-flow/ML Flow Deployment' successfully created with id 
'3490b13d-8a13-4d85-b490-b7de3bcdf649'.

To execute flow runs from this deployment, start an agent that pulls work from 
the the 'default' work queue:
$ prefect agent start -q 'default'


We don't need to worry about starting an agent quite yet. For now, let's head on over to the UI to view our newly created deployment!

## 6. Add a schedule to your deployment
[In the UI](http://127.0.0.1:4200), take a look at the `Deployments` tab. Click on your new deployment (called `ML Flow Deployment`). Add a schedule by clicking "Add" and choosing your desired frequency.

<img src="img/adding-a-schedule.png" style="width: 900px;"/>

Now when you look at the UI, in the `Flow Runs` tab you'll see many scheduled runs of your flow!

## 7. Execute flow runs from this deployment 
Starting a local agent will enable those scheduled flow runs to kick off. To start an agent, you simply need to:
1. Copy this command: `prefect agent start -q default`.
2. Visit the `Jupyter Home Page` tab in your browser. Click "NEW" in the right-hand corner and choose "TERMINAL". 
3. This will open a new terminal window. Paste the command.
4. With the agent running, at the next schedule flow run the flow will move from a `Scheduled` to `Completed` state.

## Have some fun
Experiment on your own in the UI by adding a description to your deployment, parameters, and much more!

While in this demo, we use a static dataset (titanic.csv), it's easy to see how the `create_data()` task could be altered to draw from any number of sources, using the `requests` library, for instance, to request data from an API.

## What's next? 
Not interested in starting up the Orion server everytime you want to view your flow runs? Check out [Prefect Cloud](https://docs.prefect.io/ui/cloud/), where you can create plenty of projects in your first workspace for free!

## Viewing or removing metadata
You can view metadata about your flows and deployments by cd-ing into your orion.db: `open ~/.prefect/orion.db`. This will open a SQLite browser if one exists.

Alternatively, you can `rm -rf ~/.prefect/orion.db` if you would like to remove all of your existing flows, flow runs, and deployments that you created. If you only want to remove one flow, flow run, or deployment, I would recommend doing so through the UI. 

## Happy engineering!
We hope you were able to learn a little more about how Prefect works. We've barely scratched the surface of what's possible with Prefect. For instance, you can also store repeated configuration with blocks and integrate with other tools easily. Please see our docs to learn even more about the possibilities Prefect can give your workflows:
https://docs.prefect.io/

Some things we didn't touch on in this tutorial:
- Other build commands that can be used during deployment: https://docs.prefect.io/concepts/deployments/#deployment-build-options
- Flow Storage: https://docs.prefect.io/concepts/storage/
- We can set log levels to 'debug', 'error', 'info', and more: https://docs.prefect.io/concepts/logs/
- Specifying infrastructure allows you to deploy your agents and control where your flows are run: https://docs.prefect.io/concepts/infrastructure/

<img src="img/marvin.png" style="width: 300px;"/>

Prefect's mascot, Marvin, is always cooking up new recipes on how to use Prefect. Stop on by [prefect-recipes](https://github.com/PrefectHQ/prefect-recipes) to see more!