diff --git a/docs/index.md b/docs/index.md index b5735f2b..1a642622 100644 --- a/docs/index.md +++ b/docs/index.md @@ -34,6 +34,7 @@ Metaflow makes it easy to build and manage real-life data science, AI, and ML pr - [Debugging Flows](metaflow/debugging) - [Visualizing Results](metaflow/visualizing-results/) - [Configuring Flows](metaflow/configuring-flows/introduction) +- [Composing Flows with Custom Decorators](metaflow/composing-flows/introduction) ✨*New*✨ ## II. Scaling Flows diff --git a/docs/metaflow/composing-flows/advanced-custom-decorators.md b/docs/metaflow/composing-flows/advanced-custom-decorators.md new file mode 100644 index 00000000..98a54d08 --- /dev/null +++ b/docs/metaflow/composing-flows/advanced-custom-decorators.md @@ -0,0 +1,214 @@ + +# Advanced Custom Decorators + +In addition to running logic before and after user code (as shown on +[the previous page](/metaflow/composing-flows/custom-decorators)), a decorator can +override the `@step` code entirely, executing alternative logic in its place. +Or, a decorator can take action if the user code fails. + +## Catching failures in the user code + +A decorator can catch failures in the user code by wrapping `yield` in a `try`-`except` block. The +following example shows the pattern in action, capturing any exceptions in the user code, and asking ChatGPT for +advice how to fix it. Save the module to `ai_debug.py`: + +```python +import os +import inspect +import traceback + +from metaflow import user_step_decorator + +PROMPT = """ +I have a Metaflow step that is defined as follows: + +{source} + +It raised the following exception: + +{stack_trace} + +Provide suggestions how to fix it. +""" + +@user_step_decorator +def ai_debug(step_name, flow, inputs=None, attributes=None): + source = inspect.getsource(getattr(flow, step_name)) + try: + yield + except: + print("❌ Step failed:") + stack_trace = traceback.format_exc() + prompt_gpt(PROMPT.format(source=source, stack_trace=stack_trace)) + raise + +def prompt_gpt(prompt): + import requests + OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY') + if OPENAI_API_KEY: + print("🧠 Asking AI for help..") + url = "https://api.openai.com/v1/chat/completions" + headers = { + "Authorization": f"Bearer {OPENAI_API_KEY}", + "Content-Type": "application/json" + } + data = { + "model": "gpt-4", + "messages": [{"role": "user", "content": prompt}] + } + response = requests.post(url, headers=headers, json=data) + resp = response.json()["choices"][0]["message"]["content"] + print(f"🧠💡 AI suggestion:\n\n{resp}") + else: + print("Specify OPENAI_API_KEY for debugging help") +``` + +You can test the decorator e.g. with this flow: + +```python +import math +from metaflow import FlowSpec, step + +from ai_debug import ai_debug + +class FailFlow(FlowSpec): + + @ai_debug + @step + def start(self): + x = 3 + for i in range(5): + math.sqrt(x - i) + self.next(self.end) + + @step + def end(self): + pass + +if __name__ == '__main__': + FailFlow() +``` + +Set your OpenAI API key in an environment variable `OPENAI_API_KEY` and run the flow. The results are impressive: + +```mdx-code-block +import ReactPlayer from 'react-player'; +``` + + + +## Skipping the user code + +A decorator can decide to skip execution of the user code by yielding an empty dictionary, i.e. `yield {}`. Even when +skipping the user code a task is started - to execute the custom decorator - but the task is finished right after the +decorator finishes. + +The following example leverages the feature to implement a `@memoize` decorator that reuses past results, skipping +redundant recomputation: + +```python +import os +from metaflow import Flow, user_step_decorator, current + +@user_step_decorator +def memoize(step_name, flow, inputs=None, attributes=None): + artifact = attributes['artifact'] + reset = attributes.get('reset') + if reset and getattr(flow, reset, False): + print("⚙️ memoized results disabled - running the step") + yield + else: + try: + run = Flow(current.flow_name).latest_successful_run + previous_value = run[step_name].task[artifact].data + except: + print("⚙️ previous results not found - running the step") + yield + else: + print(f"✅ reusing results from a previous run {run.id}") + setattr(flow, artifact, previous_value) + yield {} +``` + +Note that `Flow` adheres to [Metaflow namespaces](/scaling/tagging), so `@memoize` can be used safely by many +concurrent users and production runs, without intermixing results. + +The following flow utilizes `@memoize` to skip downloading of data and recomputation of taxi fares in the +`compute_fare` step: + +```python +from metaflow import FlowSpec, step, Parameter, pypi + +from memoize import memoize + +URL = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-01.parquet' + +class ComputeTotalFare(FlowSpec): + + reset = Parameter('reset', default=False, is_flag=True) + url = Parameter('url', default=URL) + + @step + def start(self): + self.next(self.compute_fare) + + @memoize(artifact='total_fare', reset='reset') + @pypi(packages={'duckdb': '1.3.2'}) + @step + def compute_fare(self): + import duckdb + SQL = f"SELECT SUM(fare_amount) AS total_fare FROM '{self.url}'" + self.total_fare = duckdb.query(SQL).fetchone()[0] + self.next(self.end) + + @step + def end(self): + print(f"Total taxi fares: ${self.total_fare}") + +if __name__ == '__main__': + ComputeTotalFare() +``` + +You can use the `--reset` flag to force recomputation of results. + +## Replacing the user code + +A decorator may decide to execute another function instead of the step function defined in the flow - just +`yield` a callable that takes a `FlowSpec` object (`self` in steps) as an argument. + +The following example implements a `@fallback` decorator that first attempts to run the user code and if it +fails - `current.retry_count > 0` - it executes a fallback function instead of re-executing the user code. + +```python +from metaflow import user_step_decorator, current + +@user_step_decorator +def fallback(step_name, flow, inputs=None, attributes=None): + def _fallback_step(self): + print("🛟 step failed: executing a fallback") + var = attributes.get('indicator') + if var: + setattr(self, var, True) + + if current.retry_count == 0: + yield + else: + yield _fallback_step +``` + +If you pass an attribute `indicator` to the decorator, it stores a corresponding artifact indicating that the +step failed. You can test the decorator with the `FailFlow` above. Note that you need to apply [the `@retry` +decorator](/scaling/failures#retrying-tasks-with-the-retry-decorator) to enable retries: + +``` +python failflow.py run --with retry --with fallback.fallback:indicator=failed +``` + +:::info +The fallback function cannot modify the flow’s control logic - it cannot change the target of +a `self.next` call. The overall flow structure remains unchanged, even when a fallback +function is used. +::: + + + diff --git a/docs/metaflow/composing-flows/baseflow.md b/docs/metaflow/composing-flows/baseflow.md new file mode 100644 index 00000000..52ef15fe --- /dev/null +++ b/docs/metaflow/composing-flows/baseflow.md @@ -0,0 +1,119 @@ + +# The BaseFlow Pattern + +The previous sections introduced custom decorators and mutators, which let +you compose flows from reusable components. Production-grade ML and AI projects +often consist of many such components for data access and transformation, +quality checks, model training and inference, and publishing results - +amongst other needs. + +It’s beneficial to let end users focus on developing and iterating on +domain-specific logic, while minimizing visible boilerplate and project +scaffolding. This is where *the BaseFlow pattern* helps: It provides a +common foundation that bundles all necessary components, making them +readily available to the user. + +## Defining a base flow + +A BaseFlow is a class that inherits from `FlowSpec` and serves as a +foundation for other flows in a project. It can define shared components +such as flow mutators, `Config`s, `Parameter`s, and helper methods, but not steps +themselves. Individual flows in the project then inherit from `BaseFlow`, +automatically gaining access to the common functionality and ensuring consistency +across flows. + +A common feature of the BaseFlow pattern is a common configuration file that governs +all top-level concerns of the project. For the following example, we can define a +`project.toml`: + +```toml +name = 'myproject' + +# run the flow hourly +schedule = "0 * * * *" + +[limits] +cpu = 2 +memory = 16000 +disk = 10000 +``` + +We use the config to set up a base flow: + +```python +import tomllib + +from metaflow import Config, FlowSpec, project, config_expr, schedule + +from flow_linter import flow_linter + +def parse_limits(x): + return tomllib.loads(x)['limits'] + +@flow_linter +@project(name=config_expr('project.name')) +@schedule(cron=config_expr('project.schedule')) +class BaseFlow(FlowSpec): + + project_config = Config('project', default='project.toml', parser=tomllib.loads) + limits = Config('limits', default='project.toml', parser=parse_limits) + + def number_of_rows(self): + return len(self.table) +``` + +Note the following details: + + - We read `project.toml` as a `Config`, so all its values are available for all derived flows. + + - We ensure that all flows use `@flow_linter` which [we + defined previously](/metaflow/composing-flows/mutators#introspecting-a-flow-and-applying-configs), + and use the project config to read `limits` for it. + + - We use the config to parametrize `@project` and `@schedule`. + + - We define a helper method, `number_of_rows`, which [comes in + handy with `@dataset`](/metaflow/composing-flows/mutators#applying-multiple-decorators-with-a-step-mutator). + +Another common pattern is to include metadata, [such as Git +information](/metaflow/configuring-flows/custom-parsers#including-default-configs-in-flows), in flows +automatically. Depending on your needs, your `BaseFlow` can grow arbitrarily feature-rich. + +## Using a base flow + +Here is an example flow that uses the `BaseFlow` defined above: + +```python +from baseflow import BaseFlow +from metaflow import step, Config, current, resources + +from dataset import dataset + +class ComposedFlow(BaseFlow): + + data_config = Config('dataset', default='dataset.json') + + @resources(cpu=2) + @dataset(url=data_config.url) + @step + def start(self): + print(f"Project {current.project_name}") + print("Number of rows:", self.number_of_rows()) + self.next(self.end) + + @step + def end(self): + pass + +if __name__ == '__main__': + ComposedFlow() +``` + +Thanks to `BaseFlow`, derived flows remain clean and minimal, despite including rich functionality under the hood, such as `@project`, `@schedule`, and `@flow_linter`. Shared helper methods also make it easy to equip all derived flows with common utilities, like `number_of_rows` in the example above. + +Real-world projects often involve enough complexity and nuance that a single common foundation +can't cover every need. Instead of aiming for perfect, all-encompassing abstractions in `BaseFlow`, +it's better to allow derived flows to customize behavior as needed - such as with flow-specific +`data_config` in the example above. + + diff --git a/docs/metaflow/composing-flows/custom-decorators.md b/docs/metaflow/composing-flows/custom-decorators.md new file mode 100644 index 00000000..d85d68af --- /dev/null +++ b/docs/metaflow/composing-flows/custom-decorators.md @@ -0,0 +1,237 @@ + +# Custom Decorators + +:::note +You can find [all decorator examples in this repository](https://github.com/outerbounds/custom-decorator-examples). +::: + +Custom decorators let you: + +- Run code **before** the user-defined step, + +- Run code **after** the user-defined step, + +- Run code **on failure** of the user-defined step, + +- **Replace** the user-defined step with custom logic, + +Notably, in any of the above cases, you can read, write, or delete **artifacts** to +maintain state within and across steps, as well as to process inputs and outputs from your decorator. +This page describes the basic usage of custom decorators covering the before and after cases. The next +page, [Advanced Custom Decorators](/metaflow/composing-flows/advanced-custom-decorators), covers the +last two patterns. + +## Defining a custom decorator + +A custom step decorator is a Python generator function, annotated with `@user_step_decorator`: + +```python +import time +from metaflow import user_step_decorator, current + +@user_step_decorator +def my_profile(step_name, flow, inputs=None, attributes=None): + start = time.time() + yield + duration = 1000 * (time.time() - start) + print(f"⏰ Task [{current.pathspec}] completed in {duration:.1f}ms") +``` + +The function is passed four arguments: + + - `step_name` - the step that is being decorated. + - `flow` - a `FlowSpec` object, corresponding to `self` in steps - used to access artifacts. + - `inputs` - a list of `inputs` if the decorated function is [a join step](/metaflow/basics#branch). + - `attributes` - a dictionary of keyword arguments passed to the decorator. + +Any code before `yield` is executed before the user-defined step code. Code after it is executed +after the user's code has executed successfully. As an example, the `@my_profile` decorator above +measures and prints the execution time of the user code in milliseconds. Save it to a file, `myprofile.py`. + +## Using a custom decorator + +You can use the decorator in any Metaflow flow simply by importing it +``` +from myprofile import my_profile +``` +and by annotating steps with `@my_profile`. Alternatively, you can attach the decorator automatically +to all steps by executing a flow with +``` +python myflow.py run --with myprofile.my_profile +``` +You can test the above with this flow: +```python +import time +from metaflow import FlowSpec, step + +from myprofiler import my_profile + +class WaiterFlow(FlowSpec): + + @step + def start(self): + self.waiters = list(range(5)) + self.next(self.wait, foreach='waiters') + + @my_profile + @step + def wait(self): + self.duration = self.input / 10 + print(f"💤 Sleeping for {self.duration}s") + time.sleep(self.duration) + self.next(self.join) + + @step + def join(self, inputs): + self.total = sum(inp.duration for inp in inputs) + print(f"Slept {self.total}s in total") + self.next(self.end) + + @step + def end(self): + pass + +if __name__ == '__main__': + WaiterFlow() +``` +Save it as `waiterflow.py` and run it as usual: +``` +python waiterflow.py run +``` +You can also test +``` +python waiterflow.py run --with myprofiler.my_profile +``` + +### Using custom decorators when executing tasks remotely + +Custom decorators don't require special treatment when [executing tasks in +the cloud](/scaling/remote-tasks/introduction) or when [deploying flows to production](/production/introduction) - +they will get [packaged automatically by Metaflow](/scaling/dependencies#unpacking-a-metaflow-project). Try it: +``` +python waiterflow.py run --with myprofiler.my_profile --with kubernetes +``` +or equally `--with batch`. Notably, the decorators don't have to exist in the same [directory +hierarchy as your flow code](/scaling/dependencies/project-structure), nor you have to include them with `@pypi`. + +If your custom decorator is part of a Python package with multiple modules, Metaflow will automatically package the entire package. This allows you to implement advanced decorators as well-structured Python packages, which can be distributed internally via your internal package repository or published to PyPI. If your decorator requires third-party dependencies, you can include them using a bundled `@pypi` decorator, as shown in [this example](#). + +## Configuring decorators and managing state + +Your decorators may want to manage state across steps, or you may want to produce results that can be accessed +after a run has completed. You can do this via artifacts. + +In the example below, save it to `statsprofiler.py`, we aggregate timings from all tasks in an artifact, +`timings`. Note that we use `hasattr` to see if the artifact exists already, and we check the presence of +`inputs` to merge timings in a join step: + +```python +import time +from metaflow import user_step_decorator +from collections import defaultdict + +@user_step_decorator +def stats_profile(step_name, flow, inputs=None, attributes=None): + start = time.time() + yield + duration = int(1000 * (time.time() - start)) + + if not hasattr(flow, "timings"): + flow.timings = defaultdict(list) + if inputs: + for inp in inputs: + for step, timings in inp.timings.items(): + flow.timings[step].extend(timings) + flow.timings[step_name].append(duration) + if step_name == "end" and not attributes.get("silent"): + print_results(flow.timings) + +def print_results(all_timings): + print("📊 Step timings") + print(f"{'Step':<20}{'P10 (ms)':<15}{'Median (ms)':<15}{'P90 (ms)':<15}") + for step, timings in all_timings.items(): + timings.sort() + n = len(timings) + p10 = timings[int(n * 0.1)] + median = timings[n // 2] + p90 = timings[int(n * 0.9)] + print(f"{step:<20}{p10:<15}{median:<15}{p90:<15}") +``` + +You can also configure the decorator behavior through `attributes`. Here, you can set +```python +@stats_profile(silent=True) +``` +to suppress outputting of results. Or, on the command line: +``` +python waiterflow.py run --with statsprofiler.stats_profile:silent=1 +``` +You can then use [the Client API](/metaflow/client) as usual to +access the results in the `timings` artifact. + +![](/assets/statsprofiler.png) + +## Exposing an API to the user code + +A custom decorator can expose an API to user code via a temporary artifact. This allows you to maintain state using arbitrarily complex Python objects - such as instances of custom classes - that may not be easily serializable or retrievable as stable artifacts through the Client API. + +This example, `traceprofiler.py`, exposes [a Python context manager](https://realpython.com/python-with-statement/) +through `self.trace` which the user code can use to collect timings of arbitrary sections of code. + +```python +import time +from metaflow import user_step_decorator +from collections import defaultdict + +@user_step_decorator +def trace_profile(step_name, flow, inputs=None, attributes=None): + flow.trace = TraceCollector + yield + del flow.trace + flow.timings = TraceCollector.timings + for name, timings in TraceCollector.timings.items(): + print(f"Trace: {name} - Total: {int(timings)}ms") + +class TraceCollector(object): + + timings = defaultdict(int) + + def __init__(self, name): + self.name = name + + def __enter__(self): + self.start = time.time() + + def __exit__(self, type, value, traceback): + self.timings[self.name] += 1000 * (time.time() - self.start) +``` + +It wouldn't make sense to store the context manager class, `TraceCollector`, as an artifact so we delete +it (`del flow.trace`) after the user code has finished and only store a dictionary of results as an artifact. +The following flow demonstrates the `with self.trace` in action: + +```python +import time +from metaflow import FlowSpec, step + +from traceprofiler import trace_profile + +class TracingFlow(FlowSpec): + + @trace_profile + @step + def start(self): + for i in range(10): + with self.trace('database access'): + time.sleep(0.1) + with self.trace('process data'): + time.sleep(0.5) + self.next(self.end) + + @step + def end(self): + pass + +if __name__ == '__main__': + TracingFlow() +``` diff --git a/docs/metaflow/composing-flows/introduction.md b/docs/metaflow/composing-flows/introduction.md new file mode 100644 index 00000000..4f4fb1f9 --- /dev/null +++ b/docs/metaflow/composing-flows/introduction.md @@ -0,0 +1,91 @@ +# Composing Flows with Custom Decorators + +:::info +This is a new feature in Metaflow 2.16. Make sure you have a recent enough version of +Metaflow to use this feature. +::: + +It is common for projects to include functionality that can be reused across multiple +steps and flows. For example, you might define shared, project-specific patterns for + + - Accessing data, + - Running ETL, + - Tracking data and model lineage, + - Performing feature engineering and transformations, + - Training and evaluating a model, + - Accessing an external service, e.g. an LLM endpoint through a model router. + +You can handle cases like these by developing a shared library that encapsulates +the logic and importing it in your steps. Metaflow will [package the +library](/scaling/dependencies/project-structure) automatically for remote execution, +ensuring the logic works seamlessly from local development to production deployments. + +This section introduces a powerful Metaflow feature: **custom decorators and mutators**. +While importing and using a shared library in a step is straightforward, encapsulating +common logic in a decorator offers several key advantages: + +- **Clean [separation of concerns](https://en.wikipedia.org/wiki/Separation_of_concerns)**: + Keep shared logic out of step code, improving readability and maintainability. + +- **Clarity and consistency**: Applying a decorator makes the use of common patterns + explicit and uniform. + +- **Flexibility**: Easily enable, disable, or switch between behaviors without + touching step logic - great for implementing pluggable logic. + +- **Correctness by default**: Use mutators to apply the right patterns to all relevant steps automatically, + so e.g. a centralized platform team can establish paved paths for all. + +- **Reusable and portable**: Distribute decorators as installable packages, whether private or public. + Metaflow packages them for remote execution automatically, even if they live outside + [your project directory structure](/scaling/dependencies/project-structure). + + +:::note +Custom decorators and mutators let you develop, share, and reuse components across +flows - without modifying Metaflow’s core behavior. If you're looking to build a deeper infrastructure +integration, such as support for a new [production +orchestrator](/production/scheduling-metaflow-flows/introduction), you need to use +[the Metaflow Extension mechanism](https://github.com/Netflix/metaflow-extensions-template) instead. +For guidance, reach out on [Metaflow Slack](http://slack.outerbounds.co). +::: + +## Overview + +The following walkthrough illustrates the features. For technical details and examples, refer +to feature-specific pages below. + +### Custom decorators + +**Custom decorators** allow you to lift common logic to custom step- and flow-level decorators +which behave similarly to decorators provided by Metaflow. You can add logic to be executed before, +after, and instead of the user-defined step code. [Read more about custom decorators]( +/metaflow/composing-flows/custom-decorators) and [advanced decorator +patterns](/metaflow/composing-flows/advanced-custom-decorators). + +![](/assets/custom-decos-1.png) + +### Mutators + +**Mutators** allow you to add and remove decorators (including custom ones), +[`Config`s](/metaflow/configuring-flows/introduction) and +[`Parameter`s](metaflow/basics#how-to-define-parameters-for-flows) +in flows programmatically. [Read more about +mutators](/metaflow/composing-flows/mutators). + +![](/assets/custom-decos-2.png) + +:::note +Currently you are not able to alter the flow structure - add and remove steps - through a mutator, +but this feature is on the roadmap. +::: + +### The `BaseFlow` pattern + +**The `BaseFlow` pattern** allows you apply mutators, `Config`s, and `Parameter`s automatically +to all flows derived from the `BaseFlow`. This allows you to templatize flows according to your +project’s best practices, ensuring that all relevant decorators are applied automatically - +without requiring users to remember to add them manually. [Read more about +the `BaseFlow` pattern](/metaflow/composing-flows/baseflow). + +![](/assets/custom-decos-3.png) \ No newline at end of file diff --git a/docs/metaflow/composing-flows/mutators.md b/docs/metaflow/composing-flows/mutators.md new file mode 100644 index 00000000..80248365 --- /dev/null +++ b/docs/metaflow/composing-flows/mutators.md @@ -0,0 +1,304 @@ + +# Applying Decorators with Mutators + +The previous pages showed how to create various kinds of custom decorators. +**Mutators** operate at a higher level: they let you programmatically *control +which decorators and parameters are applied to your flow*. + +For instance, you can use mutators to + + - Apply stacks of decorators automatically, e.g. `@retry` and `@fallback`. + + - Create template flows that apply the right decorators automatically, + maybe [based on configs](/metaflow/configuring-flows/introduction). + + - As a foundational element of [the `BaseFlow` pattern](/metaflow/composing-flows/baseflow) + which lets you define shared, domain-specific tooling for all flows in your project, ensuring + that everyone follows the same path consistently. + +Mutators look like decorators but instead of being functions annotated with `@user_step_decorator`, +they are defined as classes derived from `FlowMutator` or `StepMutator`, using [the `Mutator` API](#). + +:::info +Unlike decorators, mutators are applied at deploy time, before a run or deployment begins. +As a result, they cannot modify the flow during execution. +::: + +## Defining a flow mutator + +A flow mutator can manipulate parameters of the flow and decorators attached to its steps through +a `MutableFlow` object, passed to the mutator's `mutate` method. + +The following example defines a `@robust_flow` mutator which applies +[Metaflow's built-in `@retry` decorator](/scaling/failures#retrying-tasks-with-the-retry-decorator) +and [the custom `@fallback` decorator](/metaflow/composing-flows/advanced-custom-decorators#replacing-the-user-code) +to all steps of the flow. + +```python +from metaflow import FlowMutator +from fallback import fallback + +class robust_flow(FlowMutator): + def init(self, *args, **kwargs): + self.disable_fallback = bool(kwargs.get("disable_fallback")) + self.fallback_attributes = {} + fallback_indicator = kwargs.get("fallback_indicator") + if fallback_indicator: + self.fallback_attributes["indicator"] = fallback_indicator + + def mutate(self, mutable_flow): + for step_name, step in mutable_flow.steps: + step.add_decorator("retry", duplicates=step.IGNORE) + if not self.disable_fallback: + step.add_decorator( + fallback, + deco_kwargs=self.fallback_attributes, + duplicates=step.IGNORE, + ) + +``` + +Note the following details: + + - A flow-level mutator is defined in a class derived from `FlowMutator`. + - You can capture and process attributes in the `init` method - not the Python's default `__init__` constructor. + - Use the `mutate` method to mutate the flow through the `mutable_flow` handle. + - When using mutators to add decorators, consider whether they should **override** or **defer to** the + same decorators added by the user. This behavior is controlled by the `duplicates` argument, which is + explained in detail in the next section. + +You can test the mutator with our previously defined `FailFlow`: + +```python +import math +from metaflow import FlowSpec, step + +from robust_flow import robust_flow + +@robust_flow(fallback_indicator='failed') +class FailFlow(FlowSpec): + + @step + def start(self): + x = 3 + for i in range(5): + math.sqrt(x - i) + self.next(self.end) + + @step + def end(self): + pass + +if __name__ == '__main__': + FailFlow() +``` + +Execute the flow without specifying any `--with` options. Thanks to the decorators added by `@robust_flow`, the +run will behave exactly the same as if it was run with: + +``` +python failflow.py run --with retry --with fallback.fallback:indicator=failed +``` + +### How to handle duplicate decorators + +What should happen if you run the above flow, decorated with `@robust_flow(fallback_indicator='failed')`, as follows: +``` +python failflow.py run --with fallback.fallback:indicator=step_failed +``` +Should the `indicator` be `failed` - as defined in the mutator attributes - or `indicator=step_failed` as defined +on the command line? + +The choice depends on the policy you want to implement: Sometimes the mutator should override the user's choice, +sometimes the opposite. You can control the behavior through the `duplicates` attribute which one of the +three options: + + - `IGNORE` - the decorator added by the mutator is ignored if a user-defined decorator exists. + - `OVERRIDE` - the decorator added by the mutator overrides a user-defined decorator. + - `ERROR` - adding duplicate decorators raises an error. + +You can test the effect of the options with `@robust_flow` above. You can see the artifacts produced with +``` +python failflow.py dump RUN_ID/start +``` + +## Introspecting a flow and applying configs + +Let's walk through a more advanced mutator that shows how you can +utilize [`Config`s](/metaflow/configuring-flows/introduction) and flow introspection in mutators. We +develop a flow [linter](https://en.wikipedia.org/wiki/Lint_(software)) that ensures that `@resources` +defined in a flow adhere to limits set in a config. + +First, let's define a configuration that specified limits for compute resources, `limits.json`: +```javascript +{ + "cpu": 2, + "memory": 16000, + "disk": 10000 +} +``` + +This mutator reads the limits through a `Config`, `mutable_flow.limits`, +iterates through all decorators of each step using `step.decorator_specs`, finds the ones +where resource limits apply, and enforces the limits by overwriting offending decorators. + +```python +from metaflow import FlowMutator, config_expr, current + +class flow_linter(FlowMutator): + def mutate(self, mutable_flow): + limits = mutable_flow.limits + for step_name, step in mutable_flow.steps: + for deco_name, _module, _args, attributes in step.decorator_specs: + if deco_name in ("kubernetes", "batch", "resources"): + for key, limit in limits.items(): + val = attributes.get(key) + if val and float(val) > limit: + print( + f"⚠️ Step[{step_name}] @{deco_name}({key}={val}) " + f"is higher than the limit of {limit} - fixed" + ) + attributes[key] = limit + step.add_decorator( + deco_name, + deco_kwargs=attributes, + duplicates=step.OVERRIDE, + ) +``` + +Try it with this flow: + +```python +from metaflow import FlowSpec, step, resources, Config + +from flow_linter import flow_linter + +@flow_linter +class HungryFlow(FlowSpec): + + limits = Config('limits', default='limits.json') + + @resources(cpu=16) + @step + def start(self): + print(self._graph_info) + self.next(self.end) + + @step + def end(self): + pass + +if __name__ == '__main__': + HungryFlow() +``` + +Run the flow e.g. as +``` +python hungryflow.py run --with resources:memory=64000 +``` +and notice the warnings. + +## Applying multiple decorators with a step mutator + +Imagine you’ve built a custom decorator that depends on third-party libraries. You could +use Metaflow’s built-in dependency management, e.g. [a `@pypi` +decorator](/scaling/dependencies/libraries), to install those libraries. However, this +requires users to remember to apply **both** your custom decorator **and** the appropriate +`@pypi` decorator, which is error-prone. + +A better solution is to create a step mutator that adds the decorators automatically. As an example, +let's create a custom data access decorator that fetches a dataset, preprocesses it, and returns a +dataframe to the user step - making sure that all necessary dependencies are installed automatically. + +We can define a step mutator `@dataset` and a decorator `@process_dataset` in the same module, as they +are tightly coupled: + +```python +from metaflow import StepMutator, config_expr, current, user_step_decorator + +DEPS = {"duckdb": "1.3.2", "pyarrow": "20.0.0"} + +@user_step_decorator +def process_dataset(step_name, flow, inputs=None, attr=None): + import duckdb + + sql = f"""SELECT * FROM '{attr["url"]}'""" + fltr = attr.get("filter") + if fltr: + sql += f"WHERE {fltr}" + con = duckdb.connect() + print("🔄 Preparing data") + flow.table = con.execute(sql).fetch_arrow_table() + print("✅ Data prepared") + yield + del flow.table + +class dataset(StepMutator): + def init(self, *args, **kwargs): + self.url = kwargs["url"] + self.filter = kwargs.get("filter") + + def mutate(self, mutable_step): + mutable_step.add_decorator( + "pypi", deco_kwargs={"packages": DEPS}, duplicates=mutable_step.ERROR + ) + mutable_step.add_decorator( + process_dataset, + deco_kwargs={"filter": self.filter, "url": self.url}, + duplicates=mutable_step.ERROR, + ) +``` + +From the user’s perspective, the step mutator `@dataset` behaves like a regular decorator. Its +role is to capture attributes such as `url` and `filter`, and automatically apply two additional +decorators — `@pypi` and `@process_dataset` — to the step where it is used. + +After this, the `@process_dataset` decorator can `import duckdb` knowing that the library is +available. Note that we use [the temporary artifact +pattern](/metaflow/composing-flows/custom-decorators#exposing-an-api-to-the-user-code) to expose +an Arrow table, `flow.df`, to the user code, but we don't persist it as an artifact. + +Let's try `@dataset` in a flow. To demonstrate another useful pattern, we load attributes from a +config file, `dataset.json` which can look like this: +```javascript +{ + "url": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-01.parquet", + "filter": "tip_amount > fare_amount" +} +``` + +..and pass them to the `@dataset` mutator: + +```python +from metaflow import FlowSpec, step, resources, Config + +from dataset import dataset + +class DatasetFlow(FlowSpec): + + data_config = Config('dataset', default='dataset.json') + + @dataset(url=data_config.url, filter=data_config.filter) + @step + def start(self): + print(self.table) + self.next(self.end) + + @step + def end(self): + pass + +if __name__ == '__main__': + DatasetFlow() +``` + +Note that you can apply step mutators with `--with`, similar to decorators: +``` +python datasetflow.py --environment=pypi run --with 'dataset.dataset:url=SOME_URL' +``` + +The internals of `@dataset` are fully encapsulated - users don’t need to worry about +installing `duckdb` or `pyarrow` themselves, or even know that `duckdb` is used. + + + diff --git a/sidebars.js b/sidebars.js index 906127dc..b3995dc6 100644 --- a/sidebars.js +++ b/sidebars.js @@ -123,6 +123,21 @@ const sidebars = { "metaflow/configuring-flows/config-driven-experimentation" ], }, + { + type: "category", + label: "Composing Flows", + link: { + type: "doc", + id: "metaflow/composing-flows/introduction", + }, + items: [ + "metaflow/composing-flows/custom-decorators", + "metaflow/composing-flows/advanced-custom-decorators", + "metaflow/composing-flows/mutators", + "metaflow/composing-flows/baseflow" + + ] + }, ], }, diff --git a/static/assets/ai_debug.mp4 b/static/assets/ai_debug.mp4 new file mode 100644 index 00000000..bfff04d7 Binary files /dev/null and b/static/assets/ai_debug.mp4 differ diff --git a/static/assets/custom-decos-1.png b/static/assets/custom-decos-1.png new file mode 100644 index 00000000..c810705b Binary files /dev/null and b/static/assets/custom-decos-1.png differ diff --git a/static/assets/custom-decos-2.png b/static/assets/custom-decos-2.png new file mode 100644 index 00000000..e15b08a7 Binary files /dev/null and b/static/assets/custom-decos-2.png differ diff --git a/static/assets/custom-decos-3.png b/static/assets/custom-decos-3.png new file mode 100644 index 00000000..395710eb Binary files /dev/null and b/static/assets/custom-decos-3.png differ diff --git a/static/assets/statsprofiler.png b/static/assets/statsprofiler.png new file mode 100644 index 00000000..bce80b48 Binary files /dev/null and b/static/assets/statsprofiler.png differ