Skip to content

Commit

Permalink
[docs] job execution (#7776)
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed May 10, 2022
1 parent 35a198b commit 0540339
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 0 deletions.
43 changes: 43 additions & 0 deletions docs/content/concepts/ops-jobs-graphs/job-execution.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,46 @@ src="/images/concepts/solids-pipelines/solid-selection.png"
width={3808}
height={2414}
/>

## Controlling Job Execution

Each <PyObject object="JobDefinition" /> contains an <PyObject object="ExecutorDefinition" /> that determines how it will be executed.

This `executor_def` property can be set to allow for different types of isolation and parallelism, ranging from executing all the ops in the same process to executing each op in its own Kubernetes pod. See [Executors](/deployment/executors) for more details.

### Default Job Executor

The default job executor definition defaults to multiprocess execution. It also allows you to toggle between in process and multiprocess execution via config.

Here is an example of run config as yaml you could provide in the dagit playground to do an in process execution.

```python file=/concepts/solids_pipelines/pipeline_execution.py startafter=start_ip_yaml endbefore=end_ip_yaml
execution:
config:
in_process:
```

Additional config options are available for multiprocess execution that can help with performance. This includes limiting the max concurrent subprocesses and controlling how those subprocess are spawned.

The example below sets the run config directly on the job to explicitly set the max concurrent subprocesses to 4, and change the subprocess start method to use a forkserver.

```python file=/concepts/solids_pipelines/pipeline_execution.py startafter=start_mp_cfg endbefore=end_mp_cfg
@job(
config={
"execution": {
"config": {
"multiprocess": {
"start_method": {
"forkserver": {},
},
"max_concurrent": 4,
},
}
}
}
)
def forkserver_job():
multi_three(add_two(return_one()))
```

Using forkserver is a great way to reduce per process overhead during multiprocess execution, but can cause issues with certain libraries. More details can be found [here](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods).
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,35 @@ def execute_subset():
@op
def total(in_1: int, in_2: int, in_3: int, in_4: int):
return in_1 + in_2 + in_3 + in_4


ip_yaml = """
# start_ip_yaml
execution:
config:
in_process:
# end_ip_yaml
"""

# start_mp_cfg
@job(
config={
"execution": {
"config": {
"multiprocess": {
"start_method": {
"forkserver": {},
},
"max_concurrent": 4,
},
}
}
}
)
def forkserver_job():
multi_three(add_two(return_one()))


# end_mp_cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import yaml

from dagster import execute_pipeline
from docs_snippets.concepts.solids_pipelines.pipeline_execution import (
execute_subset,
forkserver_job,
ip_yaml,
my_job,
)

Expand All @@ -11,3 +16,11 @@ def test_execute_my_job():

def test_solid_selection():
execute_subset()


def test_yaml():
execute_pipeline(my_job, run_config=yaml.safe_load(ip_yaml))


def test_forkserver():
assert forkserver_job # just assert definition created

1 comment on commit 0540339

@vercel
Copy link

@vercel vercel bot commented on 0540339 May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.