-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[docs] details on mapping dynamic outputs (#6140)
resolves #6928 ## Test Plan read the docs
- Loading branch information
1 parent
8b5a4ca
commit 9168e73
Showing
4 changed files
with
290 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
121 changes: 121 additions & 0 deletions
121
docs/content/concepts/ops-jobs-graphs/dynamic-graphs.mdx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
--- | ||
title: "Dynamic Graphs | Dagster" | ||
description: Dagster APIs for runtime determined graph structures. | ||
--- | ||
|
||
# Dynamic Graphs | ||
|
||
The ability for portions of a [graph](/concepts/ops-jobs-graphs/jobs-graphs) to be duplicated at runtime. | ||
|
||
## Relevant APIs | ||
|
||
| Name | Description | | ||
| ---------------------------------------------------- | --------------------------------------------------------------------------------------------- | | ||
| <PyObject module="dagster" object="DynamicOut" /> | Declare that an op will return dynamic outputs | | ||
| <PyObject module="dagster" object="DynamicOutput" /> | The object that an op will yield repeatedly, each containing a value and a unique mapping_key | | ||
|
||
## Overview | ||
|
||
The basic unit of computation in Dagster is the [op](/concepts/ops-jobs-graphs/ops). In certain cases it is desirable to run the same op multiple times on different pieces of similar data. | ||
|
||
Dynamic outputs are the tool Dagster provides to allow resolving the pieces of data at runtime and having downstream copies of the ops created for each piece. | ||
|
||
## Using Dynamic Outputs | ||
|
||
### A Static Job | ||
|
||
Here we start with a contrived example of a job containing a single expensive op: | ||
|
||
```python file=/concepts/solids_pipelines/dynamic.py startafter=non_dyn_start endbefore=non_dyn_end | ||
@op | ||
def data_processing(): | ||
large_data = load_big_data() | ||
interesting_result = expensive_processing(large_data) | ||
return analyze(interesting_result) | ||
|
||
|
||
@job | ||
def naive(): | ||
data_processing() | ||
``` | ||
|
||
While, the implementation of `expensive_computation` can internally do something to parallelize the compute, if anything goes wrong with any part we have to restart the whole computation. | ||
|
||
### A Dynamic Job | ||
|
||
With this motivation we will break up the computation using Dynamic Outputs. First we will define our new op that will use dynamic outputs. First we use <PyObject module="dagster" object="DynamicOut" /> to declare that this op will return dynamic outputs. Then in the function we `yield` a number of <PyObject module="dagster" object="DynamicOutput" /> objects that each contain a value and a unique `mapping_key`. | ||
|
||
```python file=/concepts/solids_pipelines/dynamic.py startafter=dyn_out_start endbefore=dyn_out_end | ||
@op(out=DynamicOut()) | ||
def load_pieces(): | ||
large_data = load_big_data() | ||
for idx, piece in large_data.chunk(): | ||
yield DynamicOutput(piece, mapping_key=idx) | ||
``` | ||
|
||
Then after creating ops for our downstream operations, we can put them all together in a job. | ||
|
||
```python file=/concepts/solids_pipelines/dynamic.py startafter=dyn_out_start endbefore=dyn_out_end | ||
@op(out=DynamicOut()) | ||
def load_pieces(): | ||
large_data = load_big_data() | ||
for idx, piece in large_data.chunk(): | ||
yield DynamicOutput(piece, mapping_key=idx) | ||
``` | ||
|
||
Within our `@job` decorated composition function, the object representing the dynamic output can not be passed directly to another op. Either `map` or `collect` must be invoked on it. | ||
|
||
`map` takes a `Callable` which receives a single argument. This callable is evaluated once, and any invoked op that is passed the input argument will establish dependencies. The ops downstream of a dynamic output will be cloned for each dynamic output, and identified using the associated `mapping_key`. The return value from the callable is captured and wrapped in an object that allows for subsequent `map` or `collect` calls. | ||
|
||
`collect` creates a fan-in dependency over all the dynamic copies. The dependent op will receive a list containing all the values. | ||
|
||
## Advanced Mapping Examples | ||
|
||
### Chaining | ||
|
||
The following two examples are equivalent ways to establish a sequence of ops that occur for each dynamic output. | ||
|
||
```python file=/concepts/solids_pipelines/dynamic.py startafter=dyn_chain_start endbefore=dyn_chain_end | ||
@job | ||
def chained(): | ||
results = dynamic_values().map(echo).map(echo).map(echo) | ||
process(results.collect()) | ||
|
||
|
||
@job | ||
def chained_alt(): | ||
def _for_each(val): | ||
a = echo(val) | ||
b = echo(a) | ||
return echo(b) | ||
|
||
results = dynamic_values().map(_for_each) | ||
process(results.collect()) | ||
``` | ||
|
||
### Additional Arguments | ||
|
||
A lambda or scoped function can be used to pass non-dynamic outputs along side dynamic ones in `map` downstream. | ||
|
||
```python file=/concepts/solids_pipelines/dynamic.py startafter=dyn_add_start endbefore=dyn_add_end | ||
@job | ||
def other_arg(): | ||
non_dynamic = one() | ||
dynamic_values().map(lambda val: add(val, non_dynamic)) | ||
``` | ||
|
||
### Multiple Outputs | ||
|
||
Multiple outputs are returned via a `namedtuple`, where each entry can be used via `map` or `collect`. | ||
|
||
```python file=/concepts/solids_pipelines/dynamic.py startafter=dyn_mult_start endbefore=dyn_mult_end | ||
@job | ||
def multiple(): | ||
# can unpack on assignment (order based) | ||
values, _ = multiple_dynamic_values() | ||
process(values.collect()) | ||
|
||
# or access by name | ||
outs = multiple_dynamic_values() | ||
process(outs.values.collect()) | ||
``` |
149 changes: 149 additions & 0 deletions
149
examples/docs_snippets/docs_snippets/concepts/solids_pipelines/dynamic.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
from dagster import DynamicOut, DynamicOutput, job, op | ||
|
||
|
||
@op | ||
def one(): | ||
return 1 | ||
|
||
|
||
@op | ||
def add(a, b): | ||
return a + b | ||
|
||
|
||
@op | ||
def echo(x): | ||
return x | ||
|
||
|
||
@op | ||
def process(results): | ||
return sum(results) | ||
|
||
|
||
@op(out=DynamicOut()) | ||
def dynamic_values(): | ||
for i in range(2): | ||
yield DynamicOutput(i, mapping_key=f"num_{i}") | ||
|
||
|
||
@op( | ||
out={ | ||
"values": DynamicOut(), | ||
"negatives": DynamicOut(), | ||
}, | ||
) | ||
def multiple_dynamic_values(): | ||
for i in range(2): | ||
yield DynamicOutput(i, output_name="values", mapping_key=f"num_{i}") | ||
yield DynamicOutput(-i, output_name="negatives", mapping_key=f"neg_{i}") | ||
|
||
|
||
class BigData: | ||
def __init__(self): | ||
self._data = {} | ||
|
||
def chunk(self): | ||
return self._data.items() | ||
|
||
|
||
def load_big_data(): | ||
return BigData() | ||
|
||
|
||
def expensive_processing(x): | ||
return x | ||
|
||
|
||
def analyze(x): | ||
return x | ||
|
||
|
||
# non_dyn_start | ||
@op | ||
def data_processing(): | ||
large_data = load_big_data() | ||
interesting_result = expensive_processing(large_data) | ||
return analyze(interesting_result) | ||
|
||
|
||
@job | ||
def naive(): | ||
data_processing() | ||
|
||
|
||
# non_dyn_end | ||
|
||
# dyn_out_start | ||
@op(out=DynamicOut()) | ||
def load_pieces(): | ||
large_data = load_big_data() | ||
for idx, piece in large_data.chunk(): | ||
yield DynamicOutput(piece, mapping_key=idx) | ||
|
||
|
||
# dyn_out_end | ||
|
||
|
||
@op | ||
def compute_piece(piece): | ||
return expensive_processing(piece) | ||
|
||
|
||
@op | ||
def merge_and_analyze(results): | ||
return analyze(results) | ||
|
||
|
||
# dyn_job_start | ||
@job | ||
def dynamic_graph(): | ||
pieces = load_pieces() | ||
results = pieces.map(compute_piece) | ||
merge_and_analyze(results.collect()) | ||
|
||
|
||
# dyn_job_end | ||
|
||
|
||
# dyn_chain_start | ||
@job | ||
def chained(): | ||
results = dynamic_values().map(echo).map(echo).map(echo) | ||
process(results.collect()) | ||
|
||
|
||
@job | ||
def chained_alt(): | ||
def _for_each(val): | ||
a = echo(val) | ||
b = echo(a) | ||
return echo(b) | ||
|
||
results = dynamic_values().map(_for_each) | ||
process(results.collect()) | ||
|
||
|
||
# dyn_chain_end | ||
|
||
# dyn_add_start | ||
@job | ||
def other_arg(): | ||
non_dynamic = one() | ||
dynamic_values().map(lambda val: add(val, non_dynamic)) | ||
|
||
|
||
# dyn_add_end | ||
# dyn_mult_start | ||
@job | ||
def multiple(): | ||
# can unpack on assignment (order based) | ||
values, _ = multiple_dynamic_values() | ||
process(values.collect()) | ||
|
||
# or access by name | ||
outs = multiple_dynamic_values() | ||
process(outs.values.collect()) | ||
|
||
|
||
# dyn_mult_end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
9168e73
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Successfully deployed to the following URLs:
dagster – ./docs/next
dagster.vercel.app
dagster-elementl.vercel.app
docs.dagster.io
dagster-git-master-elementl.vercel.app
new-docs.dagster.io