Skip to content

Commit

Permalink
migrate concept docs code snippet paths to graph/job/op (#6775)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed May 11, 2022
1 parent 15e9eb5 commit d099e43
Show file tree
Hide file tree
Showing 57 changed files with 128 additions and 160 deletions.
12 changes: 6 additions & 6 deletions docs/content/concepts/ops-jobs-graphs/dynamic-graphs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Dynamic outputs are the tool Dagster provides to allow resolving the pieces of d

Here we start with a contrived example of a job containing a single expensive op:

```python file=/concepts/solids_pipelines/dynamic.py startafter=non_dyn_start endbefore=non_dyn_end
```python file=/concepts/ops_jobs_graphs/dynamic.py startafter=non_dyn_start endbefore=non_dyn_end
@op
def data_processing():
large_data = load_big_data()
Expand All @@ -45,7 +45,7 @@ While, the implementation of `expensive_computation` can internally do something

With this motivation we will break up the computation using Dynamic Outputs. First we will define our new op that will use dynamic outputs. First we use <PyObject module="dagster" object="DynamicOut" /> to declare that this op will return dynamic outputs. Then in the function we `yield` a number of <PyObject module="dagster" object="DynamicOutput" /> objects that each contain a value and a unique `mapping_key`.

```python file=/concepts/solids_pipelines/dynamic.py startafter=dyn_out_start endbefore=dyn_out_end
```python file=/concepts/ops_jobs_graphs/dynamic.py startafter=dyn_out_start endbefore=dyn_out_end
@op(out=DynamicOut())
def load_pieces():
large_data = load_big_data()
Expand All @@ -55,7 +55,7 @@ def load_pieces():

Then after creating ops for our downstream operations, we can put them all together in a job.

```python file=/concepts/solids_pipelines/dynamic.py startafter=dyn_job_start endbefore=dyn_job_end
```python file=/concepts/ops_jobs_graphs/dynamic.py startafter=dyn_job_start endbefore=dyn_job_end
@job
def dynamic_graph():
pieces = load_pieces()
Expand All @@ -75,7 +75,7 @@ Within our `@job` decorated composition function, the object representing the dy

The following two examples are equivalent ways to establish a sequence of ops that occur for each dynamic output.

```python file=/concepts/solids_pipelines/dynamic.py startafter=dyn_chain_start endbefore=dyn_chain_end
```python file=/concepts/ops_jobs_graphs/dynamic.py startafter=dyn_chain_start endbefore=dyn_chain_end
@job
def chained():
results = dynamic_values().map(echo).map(echo).map(echo)
Expand All @@ -97,7 +97,7 @@ def chained_alt():

A lambda or scoped function can be used to pass non-dynamic outputs along side dynamic ones in `map` downstream.

```python file=/concepts/solids_pipelines/dynamic.py startafter=dyn_add_start endbefore=dyn_add_end
```python file=/concepts/ops_jobs_graphs/dynamic.py startafter=dyn_add_start endbefore=dyn_add_end
@job
def other_arg():
non_dynamic = one()
Expand All @@ -108,7 +108,7 @@ def other_arg():

Multiple outputs are returned via a `namedtuple`, where each entry can be used via `map` or `collect`.

```python file=/concepts/solids_pipelines/dynamic.py startafter=dyn_mult_start endbefore=dyn_mult_end
```python file=/concepts/ops_jobs_graphs/dynamic.py startafter=dyn_mult_start endbefore=dyn_mult_end
@job
def multiple():
# can unpack on assignment (order based)
Expand Down
10 changes: 5 additions & 5 deletions docs/content/concepts/ops-jobs-graphs/job-execution.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ You can also launch jobs in other ways:

## Executing a Job

```python file=/concepts/solids_pipelines/pipeline_execution.py startafter=start_pipeline_marker endbefore=end_pipeline_marker
```python file=/concepts/ops_jobs_graphs/job_execution.py startafter=start_pipeline_marker endbefore=end_pipeline_marker
from dagster import job, op


Expand Down Expand Up @@ -99,7 +99,7 @@ Dagster includes Python APIs for execution that are useful when writing tests or
<PyObject object="JobDefinition" method="execute_in_process" /> executes a job and
returns an <PyObject object="ExecuteInProcessResult" />.

```python file=/concepts/solids_pipelines/pipeline_execution.py startafter=start_execute_marker endbefore=end_execute_marker
```python file=/concepts/ops_jobs_graphs/job_execution.py startafter=start_execute_marker endbefore=end_execute_marker
if __name__ == "__main__":
result = my_job.execute_in_process()
```
Expand Down Expand Up @@ -136,7 +136,7 @@ It works as follows:

You can use this selection syntax in the `op_selection` argument to the <PyObject object="JobDefinition" method="execute_in_process" />:

```python file=/concepts/solids_pipelines/pipeline_execution.py startafter=start_solid_selection_marker endbefore=end_solid_selection_marker
```python file=/concepts/ops_jobs_graphs/job_execution.py startafter=start_solid_selection_marker endbefore=end_solid_selection_marker
my_job.execute_in_process(op_selection=["*add_two"])
```

Expand All @@ -161,7 +161,7 @@ The default job executor definition defaults to multiprocess execution. It also

Here is an example of run config as yaml you could provide in the dagit playground to do an in process execution.

```python file=/concepts/solids_pipelines/pipeline_execution.py startafter=start_ip_yaml endbefore=end_ip_yaml
```python file=/concepts/ops_jobs_graphs/job_execution.py startafter=start_ip_yaml endbefore=end_ip_yaml
execution:
config:
in_process:
Expand All @@ -171,7 +171,7 @@ Additional config options are available for multiprocess execution that can help

The example below sets the run config directly on the job to explicitly set the max concurrent subprocesses to 4, and change the subprocess start method to use a forkserver.

```python file=/concepts/solids_pipelines/pipeline_execution.py startafter=start_mp_cfg endbefore=end_mp_cfg
```python file=/concepts/ops_jobs_graphs/job_execution.py startafter=start_mp_cfg endbefore=end_mp_cfg
@job(
config={
"execution": {
Expand Down
41 changes: 22 additions & 19 deletions docs/content/concepts/ops-jobs-graphs/jobs-graphs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ The simplest way to create a job is to use the <PyObject object="job"/> decorato

Within the decorated function body, you can use function calls to indicate the dependency structure between the ops/graphs. In this example, the `add_one` op depends on the `return_five` op's output. Because this data dependency exists, the `add_one` op executes after `return_five` runs successfully and emits the required output.

```python file=/concepts/solids_pipelines/simple_job.py
```python file=/concepts/ops_jobs_graphs/simple_job.py
from dagster import job, op


Expand Down Expand Up @@ -63,7 +63,7 @@ You can model this by building multiple jobs that use the same underlying graph

To do this, you first define a graph with the <PyObject object="graph" decorator /> decorator.

```python file=/concepts/solids_pipelines/jobs_from_graphs.py startafter=start_define_graph endbefore=end_define_graph
```python file=/concepts/ops_jobs_graphs/jobs_from_graphs.py startafter=start_define_graph endbefore=end_define_graph
from dagster import graph, op


Expand All @@ -79,7 +79,7 @@ def do_stuff():

Then you build jobs from it using the <PyObject object="GraphDefinition" method="to_job" /> method:

```python file=/concepts/solids_pipelines/jobs_from_graphs.py startafter=start_define_jobs endbefore=end_define_jobs
```python file=/concepts/ops_jobs_graphs/jobs_from_graphs.py startafter=start_define_jobs endbefore=end_define_jobs
from dagster import ResourceDefinition

prod_server = ResourceDefinition.mock_resource()
Expand All @@ -97,10 +97,13 @@ local_job = do_stuff.to_job(

You make jobs available to Dagit, GraphQLs, and the command line by including them inside [repositories](/concepts/repositories-workspaces/repositories). If you include schedules or sensors in a repository, the repository will automatically include jobs that those schedules or sensors target.

```python file=/concepts/solids_pipelines/repo_with_job.py
from dagster import repository
```python file=/concepts/ops_jobs_graphs/repo_with_job.py
from dagster import job, repository

from .jobs import do_it_all

@job
def do_it_all():
...


@repository
Expand All @@ -118,7 +121,7 @@ When constructing a job, you can customize how that configuration will be satisf

You can supply a config dictionary. The supplied dictionary will be used to configure the job whenever the job is launched. It will show up in the Dagit Launchpad and can be overridden.

```python file=/concepts/solids_pipelines/jobs_with_default_config.py
```python file=/concepts/ops_jobs_graphs/jobs_with_default_config.py
from dagster import job, op


Expand Down Expand Up @@ -150,7 +153,7 @@ For more information on how this works, take a look at the [Partitions concept p

You can supply a <PyObject object="ConfigMapping" />. This allows you to expose a narrower config interface to your job. Instead of needing to configure every op and resource individually when launching the job, you can supply a smaller number of values to the outer config, and the <PyObject object="ConfigMapping" /> can translate it into config for all the job's ops and resources.

```python file=/concepts/solids_pipelines/jobs_with_config_mapping.py
```python file=/concepts/ops_jobs_graphs/jobs_with_config_mapping.py
from dagster import config_mapping, job, op


Expand Down Expand Up @@ -193,7 +196,7 @@ width={1000}
height={250}
/>

```python file=/concepts/solids_pipelines/linear_pipeline.py startafter=start_marker endbefore=end_marker
```python file=/concepts/ops_jobs_graphs/linear_job.py startafter=start_marker endbefore=end_marker
from dagster import job, op


Expand Down Expand Up @@ -223,7 +226,7 @@ height={250}

A single output can be passed to multiple inputs on downstream ops. In this example, the output from the first op is passed to two different ops. The outputs of those ops are combined and passed to the final op.

```python file=/concepts/solids_pipelines/multiple_io_pipeline.py startafter=start_marker endbefore=end_marker
```python file=/concepts/ops_jobs_graphs/multiple_io_job.py startafter=start_marker endbefore=end_marker
from dagster import job, op


Expand Down Expand Up @@ -265,7 +268,7 @@ An op only starts to execute once all of its inputs have been resolved. We can u

In this example, the `branching_op` outputs either the `branch_1` result or `branch_2` result. Since op execution is skipped for ops that have unresolved inputs, only one of the downstream ops will execute.

```python file=/concepts/solids_pipelines/branching_pipeline.py startafter=start_marker endbefore=end_marker
```python file=/concepts/ops_jobs_graphs/branching_job.py startafter=start_marker endbefore=end_marker
import random

from dagster import Out, Output, job, op
Expand Down Expand Up @@ -310,7 +313,7 @@ If you have a fixed set of op that all return the same output type, you can coll

The downstream op executes only if all of the outputs were successfully created by the upstream op.

```python file=/concepts/solids_pipelines/fan_in_pipeline.py startafter=start_marker endbefore=end_marker
```python file=/concepts/ops_jobs_graphs/fan_in_job.py startafter=start_marker endbefore=end_marker
from typing import List

from dagster import job, op
Expand Down Expand Up @@ -342,7 +345,7 @@ In most cases, the structure of a graph is pre-determined before execution. Dags

In this example, we have an op `files_in_directory` that defines a <PyObject object="DynamicOut" />. We `map` over the dynamic output which will cause the downstream dependencies to be cloned for each <PyObject object="DynamicOutput" /> that is yielded. The downstream copies can be identified by the `mapping_key` supplied to <PyObject object="DynamicOutput"/>. Once that's all complete, we `collect` over the results of `process_file` and pass that in to `summarize_directory`.

```python file=/concepts/solids_pipelines/dynamic_pipeline/dynamic_pipeline.py startafter=start_marker endbefore=end_marker
```python file=/concepts/ops_jobs_graphs/dynamic_pipeline/dynamic_job.py startafter=start_marker endbefore=end_marker
import os
from typing import List

Expand Down Expand Up @@ -393,7 +396,7 @@ If you have an op, say `Op A`, that does not depend on any outputs of another op

If you need to model an explicit ordering dependency, you can use the <PyObject object="Nothing"/> Dagster type on the input definition of the downstream op. This type specifies that you are passing "nothing" via Dagster between the ops, while still uses inputs and outputs to model the dependency between the two ops.

```python file=/concepts/solids_pipelines/order_based_dependency_pipeline.py startafter=start_marker endbefore=end_marker
```python file=/concepts/ops_jobs_graphs/order_based_dependency_job.py startafter=start_marker endbefore=end_marker
from dagster import In, Nothing, job, op


Expand Down Expand Up @@ -426,7 +429,7 @@ Dagster also provides more advanced abstractions to handle dependencies and IO.

You can use the same op definition multiple times in the same graph/job.

```python file=/concepts/solids_pipelines/pipelines.py startafter=start_multiple_usage_pipeline endbefore=end_multiple_usage_pipeline
```python file=/concepts/ops_jobs_graphs/jobs.py startafter=start_multiple_usage_pipeline endbefore=end_multiple_usage_pipeline
@job
def multiple_usage():
add_one(add_one(return_one()))
Expand All @@ -436,7 +439,7 @@ To differentiate between the two invocations of `add_one`, Dagster automatically

You can also manually define the alias by using the `.alias` method on the op invocation.

```python file=/concepts/solids_pipelines/pipelines.py startafter=start_alias_pipeline endbefore=end_alias_pipeline
```python file=/concepts/ops_jobs_graphs/jobs.py startafter=start_alias_pipeline endbefore=end_alias_pipeline
@job
def alias():
add_one.alias("second_addition")(add_one(return_one()))
Expand All @@ -450,7 +453,7 @@ You may run into a situation where you need to programmatically construct the de

To construct a GraphDefinition, you need to pass the constructor a graph name, a list of op or graph definitions, and a dictionary defining the dependency structure. The dependency structure declares the dependencies of each op’s inputs on the outputs of other ops in the graph. The top-level keys of the dependency dictionary are the string names of ops or graphs. If you are using op aliases, be sure to use the aliased name. Values of the top-level keys are also dictionary, which maps input names to a <PyObject object="DependencyDefinition"/>.

```python file=/concepts/solids_pipelines/pipelines.py startafter=start_pipeline_definition_marker endbefore=end_pipeline_definition_marker
```python file=/concepts/ops_jobs_graphs/jobs.py startafter=start_pipeline_definition_marker endbefore=end_pipeline_definition_marker
one_plus_one_from_constructor = GraphDefinition(
name="one_plus_one",
node_defs=[return_one, add_one],
Expand All @@ -464,7 +467,7 @@ Sometimes you may want to construct the dependencies of a graph definition from

For example, you can have a YAML like this:

```YAML file=/concepts/solids_pipelines/my_graph.yaml
```YAML file=/concepts/ops_jobs_graphs/my_graph.yaml
name: some_example
description: blah blah blah
ops:
Expand All @@ -490,7 +493,7 @@ ops:

You can programmatically generate a GraphDefinition from this YAML:

```python file=/concepts/solids_pipelines/dep_dsl.py
```python file=/concepts/ops_jobs_graphs/dep_dsl.py
import os

from dagster import DependencyDefinition, GraphDefinition, NodeInvocation, op
Expand Down
16 changes: 8 additions & 8 deletions docs/content/concepts/ops-jobs-graphs/nesting-graphs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ We use the term _node_ to refer to both ops and graphs, because both ops and gra

As a baseline, here's a job that does not use nesting. It starts with an op that returns a number, then uses two ops to convert it from Celsius to Fahrenheight, then logs the result:

```python file=/concepts/solids_pipelines/unnested_ops.py
```python file=/concepts/ops_jobs_graphs/unnested_ops.py
from dagster import job, op


Expand Down Expand Up @@ -58,7 +58,7 @@ def all_together_unnested():

We can put the ops that perform the Celsius-to-Fahrenheit conversion into their own sub-graph and invoke that sub-graph from our job's main graph:

```python file=/concepts/solids_pipelines/composite_solids.py startafter=start_composite_solid_example_marker endbefore=end_composite_solid_example_marker
```python file=/concepts/ops_jobs_graphs/nested_graphs.py startafter=start_composite_solid_example_marker endbefore=end_composite_solid_example_marker
@graph
def celsius_to_fahrenheit(number):
return add_thirty_two(multiply_by_one_point_eight(number))
Expand Down Expand Up @@ -86,7 +86,7 @@ To provide configuration to ops inside a sub-graph when launching a run, you pro

This example two ops that both take config and are wrapped by a graph, which is included inside a job.

```python file=/concepts/solids_pipelines/composite_solids.py startafter=start_composite_solid_config_marker endbefore=end_composite_solid_config_marker
```python file=/concepts/ops_jobs_graphs/nested_graphs.py startafter=start_composite_solid_config_marker endbefore=end_composite_solid_config_marker
@op(config_schema={"n": float})
def add_n(context, number):
return number + context.op_config["n"]
Expand All @@ -109,7 +109,7 @@ def subgraph_config_job():

To kick off a run of this job, you will need to specify the config for both `add_n` and `multiply_by_m` through the sub-graph:

```yaml file=/concepts/solids_pipelines/composite_config.yaml
```yaml file=/concepts/ops_jobs_graphs/composite_config.yaml
ops:
add_n_times_m_graph:
ops:
Expand All @@ -125,7 +125,7 @@ ops:

Sub-graphs can dictate config for the ops and sub-graphs inside them. If the full config is known at the time that you're defining the graph, you can pass a dictionary to the `config` argument of the <PyObject object="graph" decorator /> decorator.

```python file=/concepts/solids_pipelines/graph_provides_config.py
```python file=/concepts/ops_jobs_graphs/graph_provides_config.py
from dagster import graph, op


Expand All @@ -146,7 +146,7 @@ def celsius_to_fahrenheit(number):

Alternatively, you can use "config mapping", i.e. you can provide a function that accepts config that's provided to the graph and generates config for the nodes inside the graph.

```python file=/concepts/solids_pipelines/graph_provides_config_mapping.py
```python file=/concepts/ops_jobs_graphs/graph_provides_config_mapping.py
from dagster import config_mapping, graph, op


Expand Down Expand Up @@ -179,7 +179,7 @@ def to_fahrenheit(number):

To run a job that contains `to_fahrenheit` as a sub-graph, you need to provide a value for the `from_unit` config option:

```yaml file=/concepts/solids_pipelines/composite_config_mapping.yaml
```yaml file=/concepts/ops_jobs_graphs/composite_config_mapping.yaml
ops:
to_fahrenheit:
config:
Expand All @@ -192,7 +192,7 @@ ops:

To have multiple outputs from a graph, you need to define the outputs it maps and return a dictionary, where the keys are the output names and the values are the output values.

```python file=/concepts/solids_pipelines/composite_solids.py startafter=start_composite_multi_output_marker endbefore=end_composite_multi_output_marker
```python file=/concepts/ops_jobs_graphs/nested_graphs.py startafter=start_composite_multi_output_marker endbefore=end_composite_multi_output_marker
from dagster import GraphOut


Expand Down

1 comment on commit d099e43

@vercel
Copy link

@vercel vercel bot commented on d099e43 May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.