Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Metaflow makes it easy to build and manage real-life data science, AI, and ML pr
- [Debugging Flows](metaflow/debugging)
- [Visualizing Results](metaflow/visualizing-results/)
- [Configuring Flows](metaflow/configuring-flows/introduction)
- [Composing Flows with Custom Decorators](metaflow/composing-flows/introduction) ✨*New*✨

## II. Scaling Flows

Expand Down
214 changes: 214 additions & 0 deletions docs/metaflow/composing-flows/advanced-custom-decorators.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@

# Advanced Custom Decorators

In addition to running logic before and after user code (as shown on
[the previous page](/metaflow/composing-flows/custom-decorators)), a decorator can
override the `@step` code entirely, executing alternative logic in its place.
Or, a decorator can take action if the user code fails.

## Catching failures in the user code

A decorator can catch failures in the user code by wrapping `yield` in a `try`-`except` block. The
following example shows the pattern in action, capturing any exceptions in the user code, and asking ChatGPT for

Check warning on line 12 in docs/metaflow/composing-flows/advanced-custom-decorators.md

View workflow job for this annotation

GitHub Actions / Run linters

Line length: Expected: 100; Actual: 112
advice how to fix it. Save the module to `ai_debug.py`:

```python
import os
import inspect
import traceback

from metaflow import user_step_decorator

PROMPT = """
I have a Metaflow step that is defined as follows:

{source}

It raised the following exception:

{stack_trace}

Provide suggestions how to fix it.
"""

@user_step_decorator
def ai_debug(step_name, flow, inputs=None, attributes=None):
source = inspect.getsource(getattr(flow, step_name))
try:
yield
except:
print("❌ Step failed:")
stack_trace = traceback.format_exc()
prompt_gpt(PROMPT.format(source=source, stack_trace=stack_trace))
raise

def prompt_gpt(prompt):
import requests
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
if OPENAI_API_KEY:
print("🧠 Asking AI for help..")
url = "https://api.openai.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": "gpt-4",
"messages": [{"role": "user", "content": prompt}]
}
response = requests.post(url, headers=headers, json=data)
resp = response.json()["choices"][0]["message"]["content"]
print(f"🧠💡 AI suggestion:\n\n{resp}")
else:
print("Specify OPENAI_API_KEY for debugging help")
```

You can test the decorator e.g. with this flow:

```python
import math
from metaflow import FlowSpec, step

from ai_debug import ai_debug

class FailFlow(FlowSpec):

@ai_debug
@step
def start(self):
x = 3
for i in range(5):
math.sqrt(x - i)
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
FailFlow()
```

Set your OpenAI API key in an environment variable `OPENAI_API_KEY` and run the flow. The results are impressive:

Check warning on line 92 in docs/metaflow/composing-flows/advanced-custom-decorators.md

View workflow job for this annotation

GitHub Actions / Run linters

Line length: Expected: 100; Actual: 113

```mdx-code-block
import ReactPlayer from 'react-player';
```

<ReactPlayer controls muted playsinline url='/assets/ai_debug.mp4' width='100%' height='100%'/>

## Skipping the user code

A decorator can decide to skip execution of the user code by yielding an empty dictionary, i.e. `yield {}`. Even when
skipping the user code a task is started - to execute the custom decorator - but the task is finished right after the
decorator finishes.

The following example leverages the feature to implement a `@memoize` decorator that reuses past results, skipping
redundant recomputation:

```python
import os
from metaflow import Flow, user_step_decorator, current

@user_step_decorator
def memoize(step_name, flow, inputs=None, attributes=None):
artifact = attributes['artifact']
reset = attributes.get('reset')
if reset and getattr(flow, reset, False):
print("⚙️ memoized results disabled - running the step")
yield
else:
try:
run = Flow(current.flow_name).latest_successful_run
previous_value = run[step_name].task[artifact].data
except:
print("⚙️ previous results not found - running the step")
yield
else:
print(f"✅ reusing results from a previous run {run.id}")
setattr(flow, artifact, previous_value)
yield {}
```

Note that `Flow` adheres to [Metaflow namespaces](/scaling/tagging), so `@memoize` can be used safely by many
concurrent users and production runs, without intermixing results.

The following flow utilizes `@memoize` to skip downloading of data and recomputation of taxi fares in the
`compute_fare` step:

```python
from metaflow import FlowSpec, step, Parameter, pypi

from memoize import memoize

URL = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-01.parquet'

class ComputeTotalFare(FlowSpec):

reset = Parameter('reset', default=False, is_flag=True)
url = Parameter('url', default=URL)

@step
def start(self):
self.next(self.compute_fare)

@memoize(artifact='total_fare', reset='reset')
@pypi(packages={'duckdb': '1.3.2'})
@step
def compute_fare(self):
import duckdb
SQL = f"SELECT SUM(fare_amount) AS total_fare FROM '{self.url}'"
self.total_fare = duckdb.query(SQL).fetchone()[0]
self.next(self.end)

@step
def end(self):
print(f"Total taxi fares: ${self.total_fare}")

if __name__ == '__main__':
ComputeTotalFare()
```

You can use the `--reset` flag to force recomputation of results.

## Replacing the user code

A decorator may decide to execute another function instead of the step function defined in the flow - just
`yield` a callable that takes a `FlowSpec` object (`self` in steps) as an argument.

The following example implements a `@fallback` decorator that first attempts to run the user code and if it
fails - `current.retry_count > 0` - it executes a fallback function instead of re-executing the user code.

```python
from metaflow import user_step_decorator, current

@user_step_decorator
def fallback(step_name, flow, inputs=None, attributes=None):
def _fallback_step(self):
print("🛟 step failed: executing a fallback")
var = attributes.get('indicator')
if var:
setattr(self, var, True)

if current.retry_count == 0:
yield
else:
yield _fallback_step
```

If you pass an attribute `indicator` to the decorator, it stores a corresponding artifact indicating that the
step failed. You can test the decorator with the `FailFlow` above. Note that you need to apply [the `@retry`
decorator](/scaling/failures#retrying-tasks-with-the-retry-decorator) to enable retries:

```
python failflow.py run --with retry --with fallback.fallback:indicator=failed
```

:::info
The fallback function cannot modify the flow’s control logic - it cannot change the target of
a `self.next` call. The overall flow structure remains unchanged, even when a fallback
function is used.
:::



119 changes: 119 additions & 0 deletions docs/metaflow/composing-flows/baseflow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@

# The BaseFlow Pattern

The previous sections introduced custom decorators and mutators, which let
you compose flows from reusable components. Production-grade ML and AI projects
often consist of many such components for data access and transformation,
quality checks, model training and inference, and publishing results -
amongst other needs.

It’s beneficial to let end users focus on developing and iterating on
domain-specific logic, while minimizing visible boilerplate and project
scaffolding. This is where *the BaseFlow pattern* helps: It provides a
common foundation that bundles all necessary components, making them
readily available to the user.

## Defining a base flow

A BaseFlow is a class that inherits from `FlowSpec` and serves as a
foundation for other flows in a project. It can define shared components
such as flow mutators, `Config`s, `Parameter`s, and helper methods, but not steps
themselves. Individual flows in the project then inherit from `BaseFlow`,
automatically gaining access to the common functionality and ensuring consistency
across flows.

A common feature of the BaseFlow pattern is a common configuration file that governs
all top-level concerns of the project. For the following example, we can define a
`project.toml`:

```toml
name = 'myproject'

# run the flow hourly
schedule = "0 * * * *"

[limits]
cpu = 2
memory = 16000
disk = 10000
```

We use the config to set up a base flow:

```python
import tomllib

from metaflow import Config, FlowSpec, project, config_expr, schedule

from flow_linter import flow_linter

def parse_limits(x):
return tomllib.loads(x)['limits']

@flow_linter
@project(name=config_expr('project.name'))
@schedule(cron=config_expr('project.schedule'))
class BaseFlow(FlowSpec):

project_config = Config('project', default='project.toml', parser=tomllib.loads)
limits = Config('limits', default='project.toml', parser=parse_limits)

def number_of_rows(self):
return len(self.table)
```

Note the following details:

- We read `project.toml` as a `Config`, so all its values are available for all derived flows.

- We ensure that all flows use `@flow_linter` which [we
defined previously](/metaflow/composing-flows/mutators#introspecting-a-flow-and-applying-configs),
and use the project config to read `limits` for it.

- We use the config to parametrize `@project` and `@schedule`.

- We define a helper method, `number_of_rows`, which [comes in
handy with `@dataset`](/metaflow/composing-flows/mutators#applying-multiple-decorators-with-a-step-mutator).

Another common pattern is to include metadata, [such as Git
information](/metaflow/configuring-flows/custom-parsers#including-default-configs-in-flows), in flows
automatically. Depending on your needs, your `BaseFlow` can grow arbitrarily feature-rich.

## Using a base flow

Here is an example flow that uses the `BaseFlow` defined above:

```python
from baseflow import BaseFlow
from metaflow import step, Config, current, resources

from dataset import dataset

class ComposedFlow(BaseFlow):

data_config = Config('dataset', default='dataset.json')

@resources(cpu=2)
@dataset(url=data_config.url)
@step
def start(self):
print(f"Project {current.project_name}")
print("Number of rows:", self.number_of_rows())
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
ComposedFlow()
```

Thanks to `BaseFlow`, derived flows remain clean and minimal, despite including rich functionality under the hood, such as `@project`, `@schedule`, and `@flow_linter`. Shared helper methods also make it easy to equip all derived flows with common utilities, like `number_of_rows` in the example above.

Real-world projects often involve enough complexity and nuance that a single common foundation
can't cover every need. Instead of aiming for perfect, all-encompassing abstractions in `BaseFlow`,
it's better to allow derived flows to customize behavior as needed - such as with flow-specific
`data_config` in the example above.


Loading
Loading