# Pydata Global 2022: Production-grade Machine Learning with Flyte

In this tutorial, you're going to learn about some of the key challenges to building and deploying reliable machine learning systems. At a high level, these challenges are the following:

- Scalability
- Data Quality
- Reproducibility
- Recoverability
- Auditability

In [1]:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config


LOCAL = False


remote = FlyteRemote(
    config=Config.auto(config_file=None if not LOCAL else "./config.yaml"),
    default_project="flytesnacks",
    default_domain="development",
)

## Introduction

### Environment Setup

Follow the instructions in the [setup instructions](./README.md#setup) of
the README.

### Example 0: Flyte Basics

Let's take a look at the [first example](./workflows/example_00_intro.py).

In it, you'll see a simple pipeline that uses the penguins dataset to train a
penguin species classifier. You can run this workflow locally with:

```
python workflows/example_00_intro.py
```

#### Exercise: Understanding Workflows

Workflows are basically a domain-specific language (DSL) that builds an
execution graph that uses tasks as the building blocks for more complex pipelines.

Insert a debugging breakpoint `import pdb; pdb.set_trace()` on line 80 of the
`example_00_intro.py` script and rerun it. Take a look at all the variables
in the `training_workflow` like `data` and `model`. What data type are they?

#### Registering Your Workflow

Once you're happy with the state of your tasks and workflows, you can register
them by first packaging them up into a portable flyte archive:

```
export IMAGE='ghcr.io/flyteorg/flyte-conference-talks:pydata-global-2022-latest'
pyflyte --pkgs workflows package --image $IMAGE -f
```

This will create a `flyte-package.tgz` archive file that contains the serialized
tasks and workflows in this project. Then, you can register it with:

```
flytectl register files --project flytesnacks --domain development --archive flyte-package.tgz --version v0
```

Now we can go over to https://sandbox.union.ai/console
(or http://localhost:30080/console if you're using a local Flyte cluster) to
check out the tasks and workflows we just registered.

In [16]:
from workflows import example_00_intro

execution = remote.execute_local_workflow(
    example_00_intro.training_workflow,
    inputs={
        "hyperparameters": {"C": 0.1, "max_iter": 5000},
        "test_size": 0.2,
        "random_state": 11,
    }
)
remote.generate_console_url(execution)

'http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f3214fb2682cf4def94f'

In [17]:
execution = remote.wait(execution)

In [None]:
clf = execution.outputs["o0"]
clf

#### Scheduling Launchplans

Activate the schedule:

In [None]:
lp_id = remote.fetch_launch_plan(name="scheduled_training_workflow").id
remote.client.update_launch_plan(lp_id, "ACTIVE")
print("activated scheduled_training_workflow")

activated scheduled_training_workflow


Get the execution for the most recent schedule run.

In [None]:
recent_executions = [
    execution
    for execution in remote.recent_executions()
    if execution.spec.launch_plan.name == "scheduled_training_workflow"
]

scheduled_execution = None
model = None
if recent_executions:
    scheduled_execution = recent_executions[0]
    scheduled_execution = remote.wait(scheduled_execution)
    model = scheduled_execution.outputs["o0"]
    model

print(model)

Now deactivate the schedule

In [None]:
remote.client.update_launch_plan(lp_id, "INACTIVE")
print("deactivated scheduled_training_workflow")

deactivated scheduled_training_workflow


#### Fast Registration

Flyte support rapid iteration during development via fast registration. This
can be specified via the `--fast` flag in the `pyflyte package` command. This
zips up all of the source code of your Flyte application and bypasses the need
to re-build a docker image.

```
pyflyte --pkgs workflows package --image $IMAGE --fast -f
flytectl register files --project flytesnacks --domain development --archive flyte-package.tgz --version fast-v0
```

## Scalability

### Example 1: Dynamic Workflows

Dynamic workflows allow you to create execution graphs on the fly. This allows
you to specify for loops over inputs to implement a grid search model tuning
workflow.

In [None]:
from workflows import example_01_dynamic

execution = remote.execute_local_workflow(
    example_01_dynamic.tuning_workflow,
    inputs={
        "hyperparam_grid": [
            {"C": 0.1, "max_iter": 5000},
            {"C": 0.01, "max_iter": 5000},
            {"C": 0.001, "max_iter": 5000},
        ],
    }
)
remote.generate_console_url(execution)

### Example 2: Map Tasks

In [None]:
from workflows import example_02_map_task

execution = remote.execute_local_workflow(
    example_02_map_task.tuning_workflow,
    inputs={
        "hyperparam_grid": [
            {"C": 0.1},
            {"C": 0.01},
            {"C": 0.001},
        ],
    }
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/fe287057a00db4cacbdb


### Example 3: Plugins

In [19]:
from workflows import example_03_plugins

execution = remote.execute_local_workflow(
    example_03_plugins.training_workflow,
    inputs={
        "n_epochs": 50,
        "hyperparameters": example_03_plugins.Hyperparameters(
            in_dim=4, hidden_dim=100, out_dim=3, learning_rate=0.03
        ),
    }
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/fa292bf270d0f4701a6d


## Data Quality

### Example 4: Type System

The Flyte type system is responsible for a lot of Flyte's magic: Flyte uses
the regular Python type hints to automatically serialize outputs of tasks
and deserialize inputs of tasks from Flyte's native serialization format,
including handling the off-loading of tabular data like `pandas.DataFrame`
objects.

A nice side-effect of this is that Flyte can also analyze the execution graph
that's built at compile-time and raise errors.

Take a look at [example_04_type_system.py](./workflows/example_04_type_system.py).
Try changing the output signature of `get_data` from `pd.DataFrame` to `dict`
and to fast register it:

```
pyflyte --pkgs workflows package --image $IMAGE --fast -f
flytectl register files --project flytesnacks --domain development --archive flyte-package.tgz --version fast-v0
```

What error do you see?

### Example 5: DataFrame Types

Pandera is a data validation tool for dataframe-like objects. In
[example_05_pandera_types.py](./workflows/example_05_pandera_types.py), we define
a pandera schema that validates the output of `get_data` as well as the DataFrame
input of `split_data` at runtime.

#### Exercise

- Uncomment line 49 in the `example_05_pandera_types.py`
- Fast register it with the version `fast-pandera-error-v0` and fast register
  it, then run the cell below. What error do you see?
- Bonus: comment the offending line and fast register the workflows again with
  the version `fast-pandera-error-v1`. Re-run the cell again... what do you see?

In [25]:
from workflows import example_05_pandera_types

execution = remote.execute_local_workflow(
    example_05_pandera_types.get_splits,
    inputs={"test_size": 0.2}
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/fea0325d8490c4b41892


## Reproducibility

### Example 6: Reproducibility

Next, we'll learn about multiple levels of reproducibility:

- **Environment-level reproducibility**: As you can see in the
  [Dockerfile](./Dockerfile), we're containerizing our Flyte application to
  capture a snapshot of all the dependencies that your tasks and workflows rely on.
- **Code-level reproducibility**: In [example_06_reproducibility.py](./workflows/example_06_reproducibility.py)
  we take care of setting a random seed for our model. This is a common practice 
  but an important one to remember!
- **Resource-level reproducibility**: Finally, as you've seen previously we can
  declare the compute and memory requirements of our pipeline at the task-level.

Combined with built-in versioning for all tasks, workflows, launchplans, and
executions, Flyte gives you the ability to roll back/forward to previous versions
of any of these entities. Flyte tasks/workflows are sort of like hermetically-sealed
containers that are guaranteed to produce the same output (error or not) given
the same input.

## Recoverability

### Example 7: Caching

In this example, we revisit the model-tuning use case using `@dynamic` workflows.

In [26]:
from workflows import example_07_caching
from workflows.example_06_reproducibility import Hyperparameters

execution = remote.execute_local_workflow(
    example_07_caching.tuning_workflow,
    inputs={
        "hyperparam_grid": [
            Hyperparameters(alpha=alpha)
            for alpha in [10.0, 1.0, 0.1, 0.01, 0.001, 0.0001]
        ],
    }
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

'http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f21484f4a4a394b0b9a2'

### Example 8: Recovering Failed Executions

In [None]:
from workflows import example_08_recover_executions

execution = remote.execute_local_workflow(
    example_08_recover_executions.tuning_workflow,
    inputs={"alpha_grid": [100.0, 10.0, 1.0, 0.1, 0.01, 0.001, 0.0001, 0.00001]}
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

### Example 9: Checkpointing

In [29]:
from workflows import example_09_checkpointing
from workflows.example_06_reproducibility import Hyperparameters

execution = remote.execute_local_workflow(
    example_09_checkpointing.training_workflow,
    inputs={
        "n_epochs": 30,
        "hyperparameters": Hyperparameters(penalty="l1", random_state=42),
    }
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f783519030ee34c38ae9


## Auditability

### Example 10: Visiualization with Flyte Decks

In [2]:
from workflows import example_10_flyte_decks
from workflows.utils import download_deck


execution = remote.execute_local_workflow(
    example_10_flyte_decks.penguins_data_workflow,
    inputs={},
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f2f00b5df5120474380e


In [None]:
download_deck(remote, execution, "n0", "decks/example_10_decks.html")

### Example 11: Extending Flyte Decks

In [6]:
from workflows import example_11_extend_flyte_decks
from workflows.example_06_reproducibility import Hyperparameters
from workflows.utils import download_deck


execution = remote.execute_local_workflow(
    example_11_extend_flyte_decks.training_workflow,
    inputs={
        "hyperparameters": Hyperparameters(
            penalty="l1", alpha=0.03, random_state=12345
        )
    },
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f00fc4b0eb2b544f1ab6


In [20]:
download_deck(remote, execution, "n2", "decks/example_11_decks_n2.html")
download_deck(remote, execution, "n2", "decks/example_11_decks_n3.html")

Flyte decks for execution f00fc4b0eb2b544f1ab6 downloaded to decks/example_11_decks.html
