# Prefect Integration

`rubicon` offers an integration with [Prefect](https://www.prefect.io/), an open source workflow management engine used heavily within the Python ecosystem. In Prefect, a unit of work is called a **task**, and a collection of **tasks** makes up a **flow**. A **flow** represents your workflow pipeline. And it could be helpful to integrate `rubicon` into your workflows to persist your experimentation.

To get started, install the `prefect` extra:

```bash
pip install rubicon-ml[prefect]
```

We'll [run a Prefect server locally](https://docs.prefect.io/core/getting_started/installation.html#running-the-local-server-and-ui) for this example. If you already have Prefect and Docker installed, you can start a Prefect server and agent with these commands:

```bash
prefect server start
prefect agent start
```

For more context, check out the [workflow README on GitHub](https://github.com/capitalone/rubicon/tree/main/rubicon/workflow).

---

**Setting up a Simple Flow**

Now we can get started! Creating Prefect **tasks** is easy enough on its own, but we've added some simple ones to the `rubicon` library.

In [None]:
from rubicon.workflow.prefect import (
    get_or_create_project_task,
    create_experiment_task,
    log_artifact_task,
    log_dataframe_task,
    log_feature_task,
    log_metric_task,
    log_parameter_task,
)

We'll need a workflow to integrate `rubicon` logging into, so let's put together a simple one. To mimic a realistic example, we'll create tasks for loading data, splitting said data, extracting features, and training a model.

In [None]:
from prefect import task

@task
def load_data():
    from sklearn.datasets import load_wine
    
    return load_wine()

In [None]:
@task
def split_data(dataset):
    from sklearn.model_selection import train_test_split
    from datetime import datetime
    
    X_train, X_test, y_train, y_test = train_test_split(
        dataset.data, dataset.target, test_size=0.50, random_state=int(datetime.utcnow().timestamp())
    )

    return X_train, X_test, y_train, y_test

In [None]:
@task
def get_feature_names(dataset):
    return dataset.feature_names

In [None]:
@task
def fit_pred_model(train_test_split_result, n_components, n_neighbors, is_standardized):
    import numpy as np
    from datetime import datetime
    from sklearn import metrics
    from sklearn.decomposition import PCA
    from sklearn.neighbors import KNeighborsClassifier
    from sklearn.pipeline import make_pipeline
    from sklearn.preprocessing import StandardScaler

    X_train, X_test, y_train, y_test = train_test_split_result

    if is_standardized:
        classifier = make_pipeline(
            StandardScaler(),
            PCA(n_components=n_components),
            KNeighborsClassifier(n_neighbors=n_neighbors)
        )
    else:
        classifier = make_pipeline(
            PCA(n_components=n_components),
            KNeighborsClassifier(n_neighbors=n_neighbors)
        )
        
    classifier.fit(X_train, y_train)
    pred_test = classifier.predict(X_test)
    accuracy = metrics.accuracy_score(y_test, pred_test)
    
    return accuracy

Without `rubicon`, here's what this simple **flow** would look like:

In [None]:
from prefect import Flow

n_components = 2
n_neighbors = 5
is_standardized = True

with Flow("Wine Classification") as flow:
    wine_dataset = load_data()
    
    feature_names = get_feature_names(wine_dataset)
    train_test_split = split_data(wine_dataset)
    
    accuracy = fit_pred_model(train_test_split, n_components, n_neighbors, is_standardized)

---

**Running a Flow and Viewing Results**

Now we'll register our **flow** with the local server. The URL printed from the call to `flow.register` opens the local Prefect UI. We can use it to run and monitor our **flows**. 

In [None]:
flow_id = flow.register()

We could also put together a function to run our **flows** and wait for them to finish. That would look something like this:

In [None]:
import time
from prefect import Client

prefect_client = Client()

def run_flow(client, flow_id):
    flow_run_id = client.create_flow_run(flow_id=flow_id)
    
    is_finished = False
    while not is_finished:
        state = client.get_flow_run_info(flow_run_id).state
        print(f"{state.message.strip('.')}. Waiting...")
        
        time.sleep(3)
        
        is_finished = state.is_finished()

    assert state.is_successful()
    print(f"Flow run {flow_run_id} was successful!")
    
    return flow_run_id
    
flow_run_id = run_flow(prefect_client, flow_id)

We assigned a few variables in our **flow**, most notably the result of `fit_pred_model`, `accuracy`. This accuracy metric is how we'll determine whether or not the model we trained is a success. However, retrieving state variables from **flows** is a bit cumbersome.

In [None]:
info = prefect_client.get_flow_run_info(flow_run_id)

slugs = [t.task_slug for t in info.task_runs]
index = slugs.index(accuracy.slug)

result = info.task_runs[index].state._result.read(info.task_runs[index].state._result.location)
print(f"accuracy: {result.value}")

 What's going on here isn't exactly intuitive, and all that only extracted one piece of data from one task.

---

**This is Where Rubicon Comes In!**

We can leverage the Prefect tasks within the `rubicon` library to log all the info we want about our model run. Then, we can use the standard `rubicon` logging library to retrieve and inspect our metrics and other logged data. This is much simpler than digging through the state of each executed **task** and extracting its results.

Here's the same flow from above, this time with `rubicon` **tasks** integrated.

In [None]:
from prefect import unmapped

n_components = 2
n_neighbors = 5
is_standardized = True

with Flow("Wine Classification with Rubicon") as flow:
    root_dir = "./rubicon-root"
    project = get_or_create_project_task(
        "filesystem", root_dir, "Wine Classification with Prefect"
    )
    experiment = create_experiment_task(project, name="example with Prefect")
    
    wine_dataset = load_data()
    
    feature_names = get_feature_names(wine_dataset)
    train_test_split = split_data(wine_dataset)
    
    log_feature_task.map(unmapped(experiment), feature_names)
    log_parameter_task(experiment, "n_components", n_components)
    log_parameter_task(experiment, "n_neighbors", n_neighbors)
    log_parameter_task(experiment, "is_standardized", is_standardized)
    
    accuracy = fit_pred_model(train_test_split, n_components, n_neighbors, is_standardized)
    
    log_metric_task(experiment, "accuracy", accuracy)

Again, we'll register and run the **flow**.

In [None]:
flow_with_rubicon_id = flow.register()
flow_run_with_rubicon_id = run_flow(prefect_client, flow_with_rubicon_id)

This time we can use `rubicon` to inspect our accuracy, among other things!

In [None]:
from rubicon import Rubicon

rubicon = Rubicon(persistence="filesystem", root_dir=root_dir)
project = rubicon.get_project("Wine Classification with Prefect")

experiment = project.experiments()[0]

print(f"features - {[f.name for f in experiment.features()]}\n")

for parameter in experiment.parameters():
    print(f"parameter - {parameter.name}: {parameter.value}")
    
for metric in experiment.metrics():
    print(f"metric - {metric.name}: {metric.value}")

Extracting data from `rubicon` is much simpler than pulling it from the various tasks as they succeed. Especially in the case where we're running thousands of tasks in a flow.

---

**Concurrent Logging with Prefect**

Prefect plays nicely with Dask in order to provide parallel computing when possible. The Prefect scheduler is smart enough to know which **tasks** do and do not depend on each other, so it can intelligently decide when to run independent **tasks** at the same time.

Let's run the same **flow** as above, except this time we'll log eight different experiments with eight different feature sets and accuracy results.

First, we'll need to use Dask to start a local cluster.

In [None]:
import dask.distributed
from prefect.engine.executors import DaskExecutor
from prefect.environments.execution import LocalEnvironment

dask_client = dask.distributed.Client()

dask_executor = DaskExecutor(address=dask_client.cluster.scheduler.address)
dask_environment = LocalEnvironment(executor=dask_executor)

dask_client

Before we look at the new **flow**, let's see how easy it is to make our own `rubicon` Prefect **tasks**. Currently, the `log_feature_task` logs one feature to one experiment. Let's say in this example, we want to log our entire feature set in one **task**. That's slightly different than what we currently have in the `log_feature_task`, so let's see how we can make a new one using the `rubicon` logging library.

In [None]:
@task
def log_feature_set(experiment, feature_names):
    """log a set of features to a rubicon experiment
    
    Parameters
    ----------
    experiment : rubicon.Experiment
        the experiment to log the feature set to
    feature_names : list of str
        the names of the features to log to `experiment`
    """
    features = []
    
    for name in feature_names:
        features.append(experiment.log_feature(name=name))
        
    return features

Easy! Even though its so simple, this particular **task** is actually more in-depth than the ones you'll find in the library. The Prefect **tasks** in the library are simply wrappers around existing logging library functions, like `experiment.log_feature`.

Now we can make our new **flow** to log multiple experiments in parallel.

In [None]:
n_components =    [2,    2,     2,    2,     4,    4,     4,    4    ]
n_neighbors =     [5,    5,     10,   10,    5,    5,     10,   10   ]
is_standardized = [True, False, True, False, True, False, True, False]

experiment_names = [f"mapped run {i}" for i in range(len(n_components))]

with Flow("Wine Classification with Rubicon - Mapped", environment=dask_environment) as mapped_flow:
    project = get_or_create_project_task(
        "filesystem", root_dir, "Wine Classification with Prefect - Mapped"
    )
    experiments = create_experiment_task.map(
        unmapped(project), name=experiment_names, description=unmapped("concurrent example with Prefect")
    )
    
    wine_dataset = load_data()
    
    feature_names = get_feature_names(wine_dataset)
    train_test_split = split_data(wine_dataset)
    
    log_feature_set.map(experiments, unmapped(feature_names))
    log_parameter_task.map(experiments, unmapped("n_components"), n_components)
    log_parameter_task.map(experiments, unmapped("n_neighbors"), n_neighbors)
    log_parameter_task.map(experiments, unmapped("is_standardized"), is_standardized)
    
    accuracies = fit_pred_model.map(
        unmapped(train_test_split), n_components, n_neighbors, is_standardized
    )
 
    log_metric_task.map(experiments, unmapped("accuracy"), accuracies)

Let's register and run one last **flow**. If you check out the Gantt chart for your completed **flows** on the UI linked by `mapped_flow.register`, you'll notice tasks executing at the same time now. Specifically, you'll see a maximum of four **tasks** running at once, since our local Dask cluster has four workers.

In [None]:
flow_with_concurrent_rubicon_id = mapped_flow.register()
flow_run_with_concurrent_rubicon_id = run_flow(prefect_client, flow_with_concurrent_rubicon_id)

Retrieving all the results of a mapped **task** is even more cumbersome than the example of retrieving the accuracy above. We'll use the `rubicon` Dashboard to check out all the data we just logged.

In [None]:
from rubicon.ui import Dashboard

Dashboard(persistence="filesystem", root_dir=root_dir).run_server()