# Scipy 2023: Production-grade Machine Learning with Flyte

This workshop will focus on five facets of production-grade data science:

- ⛰️ Scalability
- ✅ Data Quality
- 🔄 Reproducibility
- 🔂 Recoverability
- 🔎 Auditability

### Learning Objectives

- Learn the basics constructs of Flyte: tasks, workflows, and launchplans
- Understand how Flyte orchestrates execution graphs, data, and compute infrastructure
- Work with the building blocks for productionizing data science workloads
- Learn how to test Flyte code, use CI/CD, and extend Flyte

## Introduction to Flyte

### Environment Setup

Follow the instructions in the setup instructions of the [README](./README.md).

We'll be using some environment variables throughout this workshop, so let's
export them right now:

```bash
export FLYTECTL_CONFIG=~/.flyte/config-sandbox.yaml
export IMAGE=ghcr.io/flyteorg/flyte-conference-talks:scipy-2023-latest
```

### Flyte Basics

Tasks, Workflows, and Launch Plans: the building blocks of Flyte.

`flytekit` is the Python SDK for Flyte. It's the way data scientists, ML engineers, data engineers, and data analysts write code that will eventually run on a Flyte cluster.

Let's take a look at the [workflows/example_intro.py](./workflows/example_intro.py) script.

In it, you'll see a simple pipeline that uses the penguins dataset to train a
penguin species classifier. This script introduces three core concepts in Flyte:

- `tasks`: the basic unit of compute in Flyte.
- `workflows`: an execution graph of tasks.
- `launchplans`: a mechanism for executing and reusing workflows.

You can run this workflow locally with:

```
python workflows/example_intro.py
```

### `pyflyte run`

Run tasks and workflows locally or on a Flyte cluster.

Run to run a workflow locally, execute the following command on on your terminal:

```bash
pyflyte run \
    workflows/example_intro.py training_workflow \
    --hyperparameters '{"C": 0.01}'
```

This is great for the local debugging experience, but what if we want to run this
workflow on an actual Flyte cluster?

`pyflyte run` also supports this use case through the `--remote` flag.

```bash
pyflyte --config ~/.flyte/config-sandbox.yaml \
    run --remote \
    --image ghcr.io/flyteorg/flyte-conference-talks:scipy-2023-latest \
    workflows/example_intro.py training_workflow \
    --hyperparameters '{"C": 0.01}'
```

Notice how we're providing two extra flags:
- `--config`: this is the path to the Flyte config file, which points `pyflyte run`
  to the Flyte cluster endpoint.
- `--remote`: this flag tells `pyflyte run` that we want to run the workflow on
  a flyte cluster.

Once you execute this command, you should see a message that looks like this:

```
Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/ff733ed1039b64067a89 to see execution in the console.
```

Where `ff733ed1039b64067a89`, in this case, is the execution id of the workflow
execution.

### Flyte Console

A tour of the Flyte console to view workflow progress and status.


The `flyteconsole` is the UI component of the Flyte stack. It provides a way to visualize workflows, launch them from the browser, and obtain useful metadata about Flyte entities and their corresponding executions.

<image src="https://raw.githubusercontent.com/flyteorg/static-resources/main/flytesnacks/getting_started/getting_started_console.gif" width="1000px">

Go to the link provided by the `pyflyte run` command to see the execution in the in the console.

### `FlyteRemote`

Programmatically run tasks and workflows.

You can also run workflows programmatically using the `FlyteRemote` class. This
is useful for:

- 📓 Running Flyte tasks/workflows within a Jupyter notebook
- 🤖 Running Flyte tasks/workflows as a microservice
- 🚢 Integrating Flyte into your CI/CD pipelines, or 

The code below illustrates how we can import the workflow functions into a Python
runtime and execute them on a Flyte cluster.

In [17]:
from workflows import example_intro
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_intro.training_workflow,
    inputs={
        "hyperparameters": example_intro.Hyperparameters(C=0.1, max_iter=5000),
        "test_size": 0.2,
        "random_state": 11,
    }
)
remote.generate_console_url(execution)

'http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f00b4149410964e44876'

In [18]:
execution = remote.wait(execution)

In [19]:
from sklearn.linear_model import LogisticRegression

clf = execution.outputs.get("o0", LogisticRegression)
clf

### Scheduling Launchplans

Run your workflows on a schedule


Recall in the `example_intro.py` script that we defined a launch plan called
`scheduled_training_workflow`. You can activate scheduled launchplans from the CLI:

First, get the latest launchplan version:

```bash
flytectl get launchplan \
    -p flytesnacks -d development \
    scheduled_training_workflow --output yaml --latest  \
    | grep 'version:' -m 1
```

Expected output:
```
version: <version>
```

Using the `flytectl` CLI, activate the launchplan:

```bash
flytectl update launchplan \
    -p flytesnacks -d development \
    scheduled_training_workflow --version '<version>' --activate
```

Make sure it's activated:

```bash
flytectl get launchplan \
    -p flytesnacks -d development \
    scheduled_training_workflow --output yaml --latest \
    | grep ' state:'
```

Expected output:
```
state: ACTIVE
```

You can also use `FlyteRemote` to activate launchplans in Python:

In [2]:
lp_id = remote.fetch_launch_plan(name="scheduled_training_workflow").id
remote.client.update_launch_plan(lp_id, "ACTIVE")
print("activated scheduled_training_workflow")

activated scheduled_training_workflow


Get the execution for the most recent scheduled run:

In [23]:
recent_executions = [
    ex for ex in remote.recent_executions()
    if ex.spec.launch_plan.name == "scheduled_training_workflow"
]

scheduled_execution = None
if recent_executions:
    scheduled_execution = recent_executions[0]
    scheduled_execution = remote.sync(scheduled_execution)
    
print(scheduled_execution)

<FlyteLiteral id { project: "flytesnacks" domain: "development" name: "fcbe6f63596a67d10000" } spec { launch_plan { resource_type: LAUNCH_PLAN project: "flytesnacks" domain: "development" name: "scheduled_training_workflow" version: "M2MrgLxOIHJfFcZ-1C2AOQ==" } metadata { mode: SCHEDULED scheduled_at { seconds: 1689044400 } system_metadata { } } labels { } annotations { } auth_role { } } closure { outputs { uri: "s3://my-s3-bucket/metadata/propeller/flytesnacks-development-fcbe6f63596a67d10000/end-node/data/0/outputs.pb" } phase: SUCCEEDED started_at { seconds: 1689044400 nanos: 87098000 } duration { seconds: 77 nanos: 644313000 } created_at { seconds: 1689044400 nanos: 79244000 } updated_at { seconds: 1689044477 nanos: 731411000 } }>


In [24]:
clf = scheduled_execution.outputs.get("o0", LogisticRegression)
clf

Deactivate the schedule with the `flytectl` CLI:

```bash
flytectl update launchplan \
    -p flytesnacks -d development \
    scheduled_training_workflow --version '<version>' --archive
```

Or with `FlyteRemote`:

In [5]:
remote.client.update_launch_plan(lp_id, "INACTIVE")
print("deactivated scheduled_training_workflow")

deactivated scheduled_training_workflow


### ⏱️ 15 Minute Break

## Flyte Programming Model

<image src="static/flyte_programming_model_overview.png" width="800px">

### Tasks as Containerized Functions

A core building block for type-safety, statelessness, and reproducibility.

Let's take a closer look at Flyte tasks.

<image src="static/flyte_tasks.png" width="400px">


The `@task`-decorated function looks deceptively simple:

In [27]:
from flytekit import task

@task
def hello():
    print("hello world")

hello()

hello world


If we want the task to do anything useful, we want to give it some inputs and
have it produce some outputs.

In [35]:
@task
def square(x: float) -> float:
    return x ** 2

square(x=2.0)

4.0

In the above code, notice a two things:

- **Tasks functions are strongly typed:** this not only provides type safety,
  it allows Flyte to analyze a sequence of tasks that depend on each other
  and determine their compatibility.
- **Tasks arguments must be kwargs:** this is a current constraint of Flyte tasks
  that may be removed in a future release.


**Exercise**:
1. Try calling `square` with a different data type.
2. Modify the code in the `square` function to output a different data type.

#### Why Containerized Functions?

What does type-safety have to do with containers?

Flyte treats tasks as containerized functions. This means that each task runs on
their own isolated container in the Flyte cluster. Coupled with strongly-typed
interfaces, tasks are essentially ✨microservices✨ that can be composed together
to form workflows.

This entails the following benefits:

- **Statelessness:** each task is stateless, which means that it can be run
  multiple times without side effects.
- **Reproducibility:** each task is reproducible, which means that it can be
  run multiple times with the same inputs and produce the same outputs.
- **Portability:** each task is portable, which means that it can be run on
  any Flyte cluster, or any infrastructure that can run containers.
- **Heterogeneity:** each task in a workflow can be written in any language and
  can be run with varying compute requirements.

Now let's run the `get_data` task from `example_intro.py` with pyflyte run:

```bash
pyflyte --config ~/.flyte/config-sandbox.yaml \
    run --remote \
    --image ghcr.io/flyteorg/flyte-conference-talks:scipy-2023-latest \
    workflows/example_intro.py get_data
```

If we go to the `flyteconsole` link provided by the `pyflyte run` command, we can
dig into the guts of a task container execution:

> - In the **Executions** tab, go to the **Logs** link, which will take you to
the Kubernetes dashboard logs.
> - If you go to the **Pod** metadata description,
you can see the **Arguments** that are provided as the entrypoint to the
task container.

**Exercise**:

We can reproduce what happens in the Flyte cluster inside a local docker container:

```bash
docker run --network="host" -it ghcr.io/flyteorg/flyte-conference-talks:scipy-2023-latest /bin/bash
```

Then, inside the container session, run the following command:

```bash
export FLYTE_SDK_LOGGING_LEVEL=20
export AWS_ACCESS_KEY_ID=minio
export AWS_SECRET_ACCESS_KEY=miniostorage
export FLYTE_AWS_ENDPOINT=http://localhost:30002
```

Finally, copy-paste the arguments from the **Pod** metadata description in the
code-fence below. You'll need to re-format the string so that you can invoke all
of the arguments as a single command.

```bash
pyflyte-fast-execute \
...
```

If everything worked as expected, you should see some `INFO` logs indicating
how long each step of the task execution took.

**Takeaway**: The `@task` decorator abstracts away many aspects of the underlying
container-native infrastructure that powers Flyte.

### Workflows and Promises

How Flyte workflows construct an execution graph of tasks.

Now let's take a closer look at Flyte workflows:

<image src="static/flyte_workflows.png" width="500px">

The Flyte `@workflow` is basically a domain-specific language (DSL) that builds an
execution graph that uses tasks as the building blocks for more complex pipelines.

In [44]:
from flytekit import workflow

@task
def error(x: list[float], y: list[float]) -> list[float]:
    return [xi - yi for xi, yi in zip(x, y)]

@task
def squares(x: list[float]) -> list[float]:
    return [xi ** 2 for xi in x]

@task
def sum_task(x: list[float]) -> float:
    return sum(x)

@workflow
def sum_of_squares(x: list[float], y: list[float]) -> float:
    errors = error(x=x, y=y)
    squared = squares(x=errors)
    return sum_task(x=squared)


sum_of_squares(x=[1.0, 2.0, 3.0], y=[1.0, 3.0, 6.0])

10.0

**Exercise:**

- In the `sum_of_squares` workflow function, print out the `squared` variable. What
  do you expect to see? What do you actually see?
- Try invoking `sum_of_squares` with different data types.
- Modify one of the types in any of the tasks used in the `sum_of_squares` workflow.

### Type System

The Flyte type system is responsible for a lot of Flyte's production-grade
qualities:

- 👟 Run-time type-checking.
- 📦 Serialization/deserialization of IO between tasks.
- 📝 Type-checking of workflows at compile-time.

This type system is language-agnostic and is implemented in the `flyteidl`
protobuf format. This means that Flyte SDKs can be written in any language.
Currently `flytekit` is the Python implementation, but there are also
Java, Scala, and Javascript SDKs.

<image src="static/flyte_type_system.png" width="400px">

Take a look at [example_04_type_system.py](./workflows/example_04_type_system.py).

Try changing the output signature of `get_data` from `pd.DataFrame` to `dict`
and try to run the `get_splits` workflow that uses it:

```bash
pyflyte run \
    workflows/example_04_type_system.py \
    get_splits --test_size 0.2 --random_state 42
```

What error do you see?

#### Data Quality: DataFrame Types

Flyte can natively handle `pandas.DataFrame` objects (and a few others, like
`polars.DataFrame`s), but it has a built-in type for dataframe-like objects called 
`StructuredDataset`s.

This allows Flyte to define types and additional metadata that Flyte can use for
run-time and compile-type type checks.

In [69]:
from typing import Annotated

import pandas as pd
from palmerpenguins import load_penguins

from flytekit import kwtypes
from flytekit.types.structured import StructuredDataset

from workflows.example_intro import TARGET, FEATURES


PenquinsDataset = Annotated[
    StructuredDataset,
    kwtypes(
        species=str,
        bill_length_mm=float,
        bill_depth_mm=float,
        flipper_length_mm=float,
        body_mass_g=float,
    ),
]

@task
def get_data() -> PenquinsDataset:
    return StructuredDataset(load_penguins()[[TARGET] + FEATURES].dropna())

structured_data = get_data()
structured_data.open(pd.DataFrame).all().head()

Unnamed: 0,species,bill_length_mm,bill_depth_mm,flipper_length_mm,body_mass_g
0,Adelie,39.1,18.7,181.0,3750.0
1,Adelie,39.5,17.4,186.0,3800.0
2,Adelie,40.3,18.0,195.0,3250.0
4,Adelie,36.7,19.3,193.0,3450.0
5,Adelie,39.3,20.6,190.0,3650.0


> ℹ️ **Note**: If we were to remove some of the columns in the dataframe, Flyte
> would throw an error because the type signature of the output would not match.

#### Statistical Data Validation with Pandera

Pandera is a data validation tool for dataframe-like objects. In
[example_05_pandera_types.py](./workflows/example_05_pandera_types.py), we define
a pandera schema that validates the output of `get_data` as well as the DataFrame
input of `split_data` at runtime.

**Exercise:**

Uncomment line 49 in the `example_05_pandera_types.py` and run the workflow with
`pyflyte run`:

```bash
pyflyte --config ~/.flyte/config-sandbox.yaml \
    run --remote \
    --image ghcr.io/flyteorg/flyte-conference-talks:scipy-2023-latest \
    workflows/example_05_pandera_types.py \
    get_splits --test_size 0.2 --random_state 42
```

What do you see?

### How Data Flows in Flyte

If tasks run in their own containers inside the Flyte cluster, how is data passed between them?

`flytekit` needs to convert all the Python types into something that the `flyteidl`
protobuf type system can understand. This is done by the `flytekit` type engine.
We'll learn more about how this works later, but at a high-level, it looks like
this:

<image src="static/flyte_data_flow.png" width="500px">

To see this diagram in action, let's kick off the `get_splits` workflow from
`example_05_pandera_types.py`:

In [71]:
from workflows import example_pandera_types
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_pandera_types.get_splits,
    inputs={"test_size": 0.2}
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f56416b1295ef4bf1844


Once the execution completes, we can see the inputs and outputs of each task
in the workflow execution.

Get the node executions:

In [74]:
execution.node_executions.keys()

dict_keys(['end-node', 'n0', 'n1', 'start-node'])

Get the output of the `get_data` node:

In [83]:
execution.node_executions["n0"].outputs["o0"].remote_path

's3://my-s3-bucket/data/ja/f56416b1295ef4bf1844-n0-0/ab2d2aa7e056df20cb57af9e68c3a422'

Get the input of the `split_data` node:

In [84]:
execution.node_executions["n1"].inputs["data"].remote_path

's3://my-s3-bucket/data/ja/f56416b1295ef4bf1844-n0-0/ab2d2aa7e056df20cb57af9e68c3a422'

The Flyte sandbox cluster actually runs a Minio object store that stores all of
the inputs and outputs. We can find them on the Minio dashboard http://localhost:30080/minio, or we can get it through the terminal:

```bash
export AWS_ACCESS_KEY_ID=minio
export AWS_SECRET_ACCESS_KEY=miniostorage
aws --endpoint-url http://localhost:30002 s3 ls <s3_path>
```

### Lifecycle of a Workflow

When you run a workflow locally, flytekit just runs the tasks in a Python runtime. However, when you run the workflow on a Flyte cluster, a lot of things are happening under the hood.

`flytepropeller` is the core engine in the Flyte stack that orchestrates:
- the execution of tasks in a particular sequence.
- the management of data dependencies between tasks.
- the compute infrastructure needed to run a task.
- ... and much more.

This is a very deep topic that we don't have time to cover in this workshop, but
here's a high-level overview of what happens:

<image src="static/flyte_workflow_lifecycle.png" width="500px">

### Development Lifecycle Overview

Typically, working with Flyte as a data scientist looks like this:

- 💻 Create and test tasks/workflows locally with `pyflyte run`
- 📦 Build a container for your tasks/workflows
- 🔁 Iterate on a Flyte cluster with `pyflyte run --remote`
- 🚀 Deploy to production on Flyte cluster using `pyflyte register --non-fast`

We haven't gone through the build step, but it would look something like this:

```bash
docker build . -t ghcr.io/cosmicbboy/flyte-conference-talks:scipy-2023-v0
docker push ghcr.io/cosmicbboy/flyte-conference-talks:scipy-2023-v0
```

If we go to the [`Dockerfile`](./Dockerfile), we can see that this packages up
all of the Flyte workflow code and the third-party dependencies into the image.

Now let's understand what `pyflyte register` is.

##### `pyflyte register`

By default, `pyflyte register` zips up all of the source code of your Flyte
application.

```
pyflyte register --project flytesnacks --domain development --image $IMAGE workflows
```

Flyte supports rapid iteration during development via "fast registration" via
`pyflyte register`. This zips up all of the source code of your Flyte 
application and bypasses the need to re-build a docker image with your updated
code in it.

```bash
pyflyte register \
    --image ghcr.io/flyteorg/flyte-conference-talks:scipy-2023-latest \
    --project flytesnacks \
    --domain development \
    workflows
```

> ℹ️ **Note**: You can provide an explicit `--version` flag, but by default `flytekit` will create a version string for you.

Now let's run the newly registered `example_intro.py` workflow:

```bash
pyflyte --config ~/.flyte/config-sandbox.yaml \
    run --remote \
    --image ghcr.io/flyteorg/flyte-conference-talks:scipy-2023-latest \
    workflows/example_intro.py training_workflow \
    --hyperparameters '{"C": 0.01}'
```

If we go to the Flyte console link provided the `pyflyte run`, we can understand
what happened during the execution by going to the **Task** tag and looking at
the `--additional-distribution` flag in the container arguments. The `tar.gz` file
is the packaged-up Flyte code in our repository, which `flytekit` understands
is the actual task code that we want to run.

#### `pyflyte register --non-fast`

Unlike `pyflyte register`, `pyflyte register --non-fast` *will not* package up
the user code and simply use the source code that is present in the image when
it was originally built.

This is the **recommended** way to deploy to production because it ensures that
the source code that is running in production is the same as the source code in
the image, since the `tar.gz` *could in theory* be swapped out by a malicious
actor.

Since we're deploying to production, let's specify the `--domain production` flag.

```bash
pyflyte register \
    --image ghcr.io/flyteorg/flyte-conference-talks:scipy-2023-latest \
    --project flytesnacks \
    --domain production \
    --non-fast \
    --version v0 \
    workflows
```

### ⏱️ 15 Minute Break

## Productionizing Data Science Workloads

In this next section, we're primarily going to use `FlyteRemote` to kick-off
workflows, read and understand the underlying `flytekit` code, and analyze
what exactly happened in `flyteconsole`.

### Parallelism

#### Example 1: Dynamic Workflows

Dynamic workflows allow you to create execution graphs on the fly. This allows
you to specify for loops over inputs to implement a grid search model tuning
workflow.

In [9]:
from workflows import example_dynamic
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_dynamic.tuning_workflow,
    inputs={
        "hyperparam_grid": [
            example_intro.Hyperparameters(C=0.1, max_iter=5000),
            example_intro.Hyperparameters(C=0.01, max_iter=5000),
            example_intro.Hyperparameters(C=0.001, max_iter=5000),
        ],
    }
)
remote.generate_console_url(execution)

'http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f7483f3766a8044cc86f'

#### Example 2: Map Tasks

Map tasks enable larger fan-outs of embarrassingly parallel computations compared
to dynamic workflows.

In [7]:
from workflows import example_map_task
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_map_task.tuning_workflow,
    inputs={
        "hyperparam_grid": [
            example_intro.Hyperparameters(C=0.1, max_iter=5000),
            example_intro.Hyperparameters(C=0.01, max_iter=5000),
            example_intro.Hyperparameters(C=0.001, max_iter=5000),
        ],
    }
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/fca97404075754c2fa4c


### Horizontal Scaling

#### Example 3: Plugins

Flyte has a plugin system that lets you integrate with a wide variety of
data and machine learning tools that help you to scale, like BigQuery,
Pyspark, and Ray.

In [10]:
from workflows import example_plugins
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_plugins.training_workflow,
    inputs={
        "n_epochs": 50,
        "hyperparameters": example_plugins.Hyperparameters(
            in_dim=4, hidden_dim=100, out_dim=3, learning_rate=0.03
        ),
    }
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/fa4de104a7afd4d1d857


### Production Notebooks

In [None]:
from workflows import example_notebook_tasks
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_notebook_tasks.data_analysis_wf,
    inputs={},
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

### Container Tasks

In [None]:
from workflows import example_container_tasks
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_container_tasks.get_data_wf,
    inputs={},
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

### Recovering from Failure

#### Caching

In [example_07_caching.py](./workflows/example_07_caching.py), we revisit the model-tuning use case using `@dynamic` workflows,
showing how caching can help reduce wasted compute.

In [12]:
from workflows import example_caching
from workflows.example_reproducibility import Hyperparameters
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_caching.tuning_workflow,
    inputs={
        "hyperparam_grid": [
            Hyperparameters(alpha=alpha)
            for alpha in [10.0, 1.0, 0.1, 0.01, 0.001, 0.0001]
        ],
    }
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/fca9432093c79475888c


#### Recovering Failed Executions

In [example_08_recover_executions.py](./workflows/example_08_recover_executions.py), we see how Flyte
provides a mechanism by which you can automatically recover from unexpected failures.

In [13]:
from workflows import example_recover_executions
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_recover_executions.tuning_workflow,
    inputs={"alpha_grid": [100.0, 10.0, 1.0, 0.1, 0.01, 0.001, 0.0001, 0.00001]}
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f30fc95e6cfa84af19bb


#### Checkpointing

In [example_09_checkpointing.py](./workflows/example_09_checkpointing.py), we
learn about how you can do intra-task checkpoints natively in Flyte to pick
up from where you left off in, e.g., a model training task.

In [14]:
from workflows import example_checkpointing
from workflows.example_reproducibility import Hyperparameters
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_checkpointing.training_workflow,
    inputs={
        "n_epochs": 30,
        "hyperparameters": Hyperparameters(penalty="l1", random_state=42),
    }
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f0f29542fe51740e9af4


### Auditing Workflows

#### Visualization with Flyte Decks

In [example_10_flyte_decks.py](./workflows/example_10_flyte_decks.py) we
create tasks that produce static html reports that help you understand the
inputs/outputs of your tasks.

In [1]:
from workflows import example_flyte_decks
from workflows.utils import download_deck, get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_flyte_decks.penguins_data_workflow,
    inputs={},
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f03a65384d3814414836


In [9]:
download_deck(remote, execution, "n0", "decks/example_decks.html")

Flyte decks for execution f9b47ecb975ad4829a95 downloaded to decks/example_10_decks.html


## Testing, CI/CD, Extending Flyte

In this final section of the workshop, we tie everything together by seeing how
to test a `flytekit` project, use CI/CD on a `flytekit` codebase, and extend
it for your own use cases.

### Writing Unit Tests

Since 95% of Flyte tasks that you'll ever write and run are just Python code,
and therefore it's extremely easy to test.

Let's take a look at the unit tests in [`tests/unit/test_workflows.py`](tests/unit/test_workflows.py). You'll notice a few things:

- We're using `pytest` as our test runner, because most tasks and workflows and
  be executed locally and "just work".
- The unit tests verify that the outputs of each workflow are of the expected
  types.
- We can use `flytekit.testing.task_mock` to mock out tasks types where local
  execution is currently not supported, e.g. `ContainerTask`s.
- We're using a `clear_cache` fixture to make sure that the workflows that rely
  on a cache are cleared (so that the functions actually run).

### Writing Integration Tests


Writing integration tests for `flytekit` is a little more involved. Let's take
a look at [`tests/integration/test_workflows.py`](tests/integration/test_workflows.py).
You'll notice a few things:

- We assume that there's a running local Flyte sandbox.
- We assume that the workflows are already registered.
- We can use a `FlyteRemote` object to execute workflows on the Flyte cluster.

### Using Github Actions

One of the core pieces of production-grade data science orchestration is the
process of CI/CD that allows you to automate your testing and deployment process.

Let's take a look at the [`.github/workflows/build.yaml`](../.github/workflows/build.yaml)
file in this repository, which contains the Github action workflows. As you can see, this build process does the following:

- Installs the Python dependencies needed to run the tests.
- Uses the [`flytectl-setup-action`](https://github.com/unionai-oss/flytectl-setup-action)  github action to get access to the `flytectl` CLI tool
- Runs the unit tests.
- Starts a Flyte sandbox cluster.
- Registers all the examples on the sandbox.

> ℹ️ **Note**: The workflows in the workshop are fairly heavy, so we can't
> actually run the integration tests against the sandbox. However, you can,
> in a real-world setting, you would connect to a remote Flyte cluster that
> is correctly configured to run the workflows.

#### Github Actions for Deployment

We can also use this system to progress workflows from `development > staging > production`.

Since Flyte has the core construct of a `domain`, we can use github actions
to automate this process. For example:

- 🚧 When a pull request (PR) is opened, register the workflows to `development`
- 🔎 When a PR is merged to `staging`, register the workflows to the `staging` domain
- 🚀 When a PR is merged to `main`, register the workflows to `production`

### Extending Flyte

#### Decorators

The simplest way to extend `flytekit` tasks is with decorators. This is a
Python-native way of modifying the behavior of a function by wrapping it in
another function.

For example, say we want to do something before and/or after the task executes:

In [85]:
from functools import wraps

def decorator(fn):

    @wraps(fn)
    def wrapper(*args, **kwargs):
        print("do something before")
        out = fn(*args, **kwargs)
        print("do something after")
        return out

    return wrapper

@task
@decorator
def add_one(x: int) -> int:
    return x + 1

add_one(x=10)

do something before
do something after


11

Note that, on the Flyte backend, this will run in the same container as the
main task function that's doing all of the heavy lifting.

This pattern is useful, e.g., when you need to use some kind of external library
to set up metrics logging. In fact, the `flytekitplugins.mlflow` plugin uses
this pattern.

#### Extending Flyte Decks

Flyte decks can be easily extended to support any arbitrary visualization, as
we can see in [example_11_extend_flyte_decks.py](./workflows/example_11_extend_flyte_decks.py)

**Exercise**

Come up with a visualization for one of inputs or outputs of any of the tasks
in `example_11_extend_flyte_decks.py`, and create a custom Flyte deck for it.

In [8]:
from workflows import example_flyte_decks_extend
from workflows.example_reproducibility import Hyperparameters
from workflows.utils import download_deck, get_remote
from IPython.display import HTML, display

remote = get_remote()
execution = remote.execute_local_workflow(
    example_flyte_decks_extend.training_workflow,
    inputs={
        "hyperparameters": Hyperparameters(
            penalty="l1", alpha=0.03, random_state=12345
        )
    },
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f9b47ecb975ad4829a95


In [10]:
download_deck(remote, execution, "n2", "decks/example_decks_n2.html")

Flyte decks for execution f9b47ecb975ad4829a95 downloaded to decks/example_11_decks_n2.html


In [11]:
download_deck(remote, execution, "n3", "decks/example_decks_n3.html")

Flyte decks for execution f9b47ecb975ad4829a95 downloaded to decks/example_11_decks_n3.html


#### Type Plugins

If you have a custom class that you want to use as input or output, `flytekit`
will fall back on the `flytekit.types.pickle.FlytePickle` type when
serializing/deserializing an instance of that class.

If you want to customize the way in which your class is serialized/deserialized,
you'll need to use the `TypeTransformer` construct.

For example, say you have a custom `Dataset` class:

In [86]:
class MyDataset(object):
    def __init__(self, data: list[dict]):
        self.data = data

If you want to serialize/deserialize this object as a file, you would do something like:

In [102]:
import json
from typing import Type

from flytekit import Blob, BlobMetadata, BlobType, FlyteContext, Literal, LiteralType, Scalar, task, workflow
from flytekit.extend import TypeTransformer


class MyDatasetTransformer(TypeTransformer[MyDataset]):

    _TYPE_INFO = BlobType(
        format="binary",
        dimensionality=BlobType.BlobDimensionality.SINGLE
    )

    def __init__(self):
        super(MyDatasetTransformer, self).__init__(
            name="mydataset-transformer", t=MyDataset
        )

    def get_literal_type(self, t: Type[MyDataset]) -> LiteralType:
        """Tells the Flytekit type system that ``MyDataset`` should be a file type."""
        return LiteralType(blob=self._TYPE_INFO)

    def to_literal(self,
        ctx: FlyteContext,
        python_val: MyDataset,
        python_type: Type[MyDataset],
        expected: LiteralType,
    ) -> Literal:
        """Converts python_val to a Flyte literal."""
        local_path = ctx.file_access.get_random_local_path()
        with open(local_path, mode="w") as f:
            json.dump(python_val.data, f)

        remote_path = ctx.file_access.get_random_remote_path()
        ctx.file_access.upload(local_path, remote_path)
        return Literal(
            scalar=Scalar(
                blob=Blob(
                    uri=remote_path,
                    metadata=BlobMetadata(type=self._TYPE_INFO)
                )
            )
        )

    def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[MyDataset]) -> MyDataset:
        """
        In this method, we want to be able to re-hydrate the custom object from Flyte Literal value.
        """
        local_path = ctx.file_access.get_random_local_path()
        ctx.file_access.download(lv.scalar.blob.uri, local_path)
        with open(local_path) as f:
            data = json.load(f)
        return MyDataset(data)

Next, we register it in the task engine:

In [105]:
from flytekit.extend import TypeEngine

TypeEngine.register(MyDatasetTransformer())

Now we can use `MyDataset` in our flyte tasks!

In [107]:
@task
def generate() -> MyDataset:
    return MyDataset([{"a": 1}, {"a": 2}])


@task
def consume(d: MyDataset) -> int:
    return sum(x["a"] for x in d.data)


@workflow
def wf() -> int:
    return consume(d=generate())

wf()

3

### Task Plugins

The last thing we'll cover in this workshop is extending Flyte tasks. `flytekit`
provides a few Flyte-native methods for extending tasks, and here we'll cover
the concept of task plugins.

Similar to the decorator pattern, task plugins allow you to customize the
behavior of the task function but gives you more flyte-specific context and
metadata.

For example, suppose we want to create a sensor that checks for the presence
of a file in the blob store:

In [110]:
import typing
from datetime import timedelta
from time import sleep

from flytekit.extend import Interface, PythonTask, context_manager


class Sensor(PythonTask):
    """
    Add documentation here for your plugin.
    This plugin creates an object store file sensor that waits and exits only when the file exists.
    """

    VAR_NAME: str = "path"

    def __init__(
        self,
        name: str,
        poll_interval: timedelta = timedelta(seconds=10),
        **kwargs,
    ):
        super(Sensor, self).__init__(
            task_type="object-store-sensor",
            name=name,
            task_config=None,
            interface=Interface(
                inputs={self.VAR_NAME: str},
                outputs={self.VAR_NAME: str},
            ),
            **kwargs,
        )
        self._poll_interval = poll_interval

    def execute(self, **kwargs) -> typing.Any:
        path = kwargs[self.VAR_NAME]
        ctx = context_manager.FlyteContext.current_context()
        user_context = ctx.user_space_params
        while True:
            user_context.logging.info(f"Sensing file in path {path}...")
            if ctx.file_access.exists(path):
                user_context.logging.info(f"file in path {path} exists!")
                return path
            user_context.logging.warning(f"file in path {path} does not exists!")
            sleep(self._poll_interval.seconds)

Now we can use the `Sensor` in our workflows:

In [111]:
from flytekit import TaskMetadata, task, workflow

sensor = Sensor(
    name="objectstore-sensor",
    metadata=TaskMetadata(retries=10, timeout=timedelta(minutes=20)),
    poll_interval=timedelta(seconds=1),
)
@task
def print_file(path: str) -> str:
    print(path)
    return path

@workflow
def my_workflow(path: str) -> str:
    return print_file(path=sensor(path=path))

**Exercise**

Run the workflow above, then `touch /tmp/test.txt` in your terminal to see
if the workflow succeeds.

In [None]:
my_workflow(path="/tmp/test.txt")

## Conclusion

🎉 Congrats! 🎉

You've made it through to the end of this workshop.

To recap, we've:

- Learned the basics constructs of Flyte: tasks, workflows, and launchplans
- Understood how Flyte orchestrates execution graphs, data, and compute infrastructure
- Worked with the building blocks for productionizing data science workloads
- Learned how to test Flyte code, use CI/CD, and extend Flyte

### Resources

- [Flyte Docs](https://docs.flyte.org)
- [Flyte Slack](https://slack.flyte.org)
- [Flyte Github](https://github.com/flyteorg/flyte)