Skip to content

Commit

Permalink
remove dynamic mapping and collect section from jobs concepts page (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed May 25, 2022
1 parent 71a85ba commit f8ff6f6
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 110 deletions.
49 changes: 0 additions & 49 deletions docs/content/concepts/ops-jobs-graphs/jobs-graphs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -339,55 +339,6 @@ def fan_in():

In this example, we have 10 op that all output the number `1`. The `sum_fan_in` op takes all of these outputs as a list and sums them.

### Dynamic Mapping & Collect

In most cases, the structure of a graph is pre-determined before execution. Dagster also has support for creating graphs where the final structure is not determined until run-time. This is useful for graph structures where you want to execute a separate instance of an op for each entry in a certain output.

In this example, we have an op `files_in_directory` that defines a <PyObject object="DynamicOut" />. We `map` over the dynamic output which will cause the downstream dependencies to be cloned for each <PyObject object="DynamicOutput" /> that is yielded. The downstream copies can be identified by the `mapping_key` supplied to <PyObject object="DynamicOutput"/>. Once that's all complete, we `collect` over the results of `process_file` and pass that in to `summarize_directory`.

```python file=/concepts/ops_jobs_graphs/dynamic_pipeline/dynamic_job.py startafter=start_marker endbefore=end_marker
import os
from typing import List

from dagster import DynamicOut, DynamicOutput, Field, job, op
from dagster.utils import file_relative_path


@op(
config_schema={
"path": Field(str, default_value=file_relative_path(__file__, "sample"))
},
out=DynamicOut(str),
)
def files_in_directory(context):
path = context.op_config["path"]
dirname, _, filenames = next(os.walk(path))
for file in filenames:
yield DynamicOutput(
value=os.path.join(dirname, file),
# create a mapping key from the file name
mapping_key=file.replace(".", "_").replace("-", "_"),
)


@op
def process_file(path: str) -> int:
# simple example of calculating size
return os.path.getsize(path)


@op
def summarize_directory(sizes: List[int]) -> int:
# simple example of totalling sizes
return sum(sizes)


@job
def process_directory():
file_results = files_in_directory().map(process_file)
summarize_directory(file_results.collect())
```

### Order-based Dependencies (Nothing dependencies)

Dependencies in Dagster are primarily _data dependencies_. Using data dependencies means each input of an op depends on the output of an upstream op.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
naive,
other_arg,
)
from docs_snippets.concepts.ops_jobs_graphs.dynamic_pipeline.dynamic_job import (
process_directory,
)
from docs_snippets.concepts.ops_jobs_graphs.fan_in_job import fan_in
from docs_snippets.concepts.ops_jobs_graphs.jobs import (
alias,
Expand Down Expand Up @@ -63,18 +60,6 @@ def test_fan_in():
assert result.output_for_node("sum_fan_in") == 10


def test_dynamic():
result = process_directory.execute_in_process()
assert result.success

assert result.output_for_node("process_file") == {
"empty_stuff_bin": 0,
"program_py": 34,
"words_txt": 40,
}
assert result.output_for_node("summarize_directory") == 74


def test_dep_dsl():
result = define_dep_dsl_graph().execute_in_process(
run_config={"ops": {"A": {"inputs": {"num": 0}}}}
Expand Down

1 comment on commit f8ff6f6

@vercel
Copy link

@vercel vercel bot commented on f8ff6f6 May 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.