Skip to content

Commit

Permalink
Run retries docs (#8367)
Browse files Browse the repository at this point in the history
### Summary & Motivation

### How I Tested These Changes
  • Loading branch information
johannkm authored and clairelin135 committed Jun 14, 2022
1 parent 2a2653e commit 14696ae
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 1 deletion.
6 changes: 5 additions & 1 deletion docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,12 @@
"path": "/deployment/run-coordinator"
},
{
"title": "Run Monitoring - Handling Failures",
"title": "Run Monitoring - Detecting Failures",
"path": "/deployment/run-monitoring"
},
{
"title": "Run Retries",
"path": "/deployment/run-retries"
}
]
},
Expand Down
42 changes: 42 additions & 0 deletions docs/content/deployment/run-retries.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
title: Run retries | Dagster
description: Automatically retry Dagster runs
---

# Run Retries

If you configure retries at the [Job](/\_apidocs/jobs#jobs) level, a new run will be kicked off when a run for that job fails. Compared to [Op retries](/concepts/ops-jobs-graphs/op-retries), the max retry limit for run retries applies to the whole run instead of each individual Op. Run retries also handle the cases where a run worker crashed.

## Configuration

To enable run retries, add the following to your `dagster.yaml`. This will start a new daemon which polls to the event log for run failure events.

```yaml file=/deploying/dagster_instance/dagster.yaml startafter=start_run_retries endbefore=end_run_retries
run_retries:
enabled: true
max_retries: 3 # Sets a default for all jobs. 0 if not set
```

You can also configure retries using tags either on Job definitions or in the Dagit [Launchpad](/concepts/dagit/dagit#launchpad).

```python file=/deploying/job_retries.py
from dagster import job


@job(tags={"dagster/max_retries": 3})
def sample_job():
pass


@job(tags={"dagster/max_retries": 3, "dagster/retry_strategy": "ALL_STEPS"})
def other_sample_sample_job():
pass
```

### Retry Strategy

The `dagster/retry_strategy` tag controls which Ops the retry will run.

By default, retries will re-execute from failure (tag value `FROM_FAILURE`). This means that any successful Ops will be skipped, but their output will be used for downstream Ops. If the `dagster/retry_strategy` tag is set to `ALL_STEPS`, all the Ops will run again.

NOTE: `FROM_FAILURE` requires an IOManager that can access outputs from other runs. For example, on Kubernetes the [s3\_pickle_io_manager](/\_apidocs/libraries/dagster-aws#dagster_aws.s3.s3\_pickle_io_manager) would work but the [fs_io_manager](https://docs.dagster.io/\_apidocs/io-managers#dagster.fs_io_manager) would not, since the new run is in a new Kubernetes Job with a separate filesystem.
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ run_monitoring:

# end_run_monitoring

# start_run_retries
run_retries:
enabled: true
max_retries: 3 # Sets a default for all jobs. 0 if not set
# end_run_retries


# start_marker_telemetry

Expand Down
11 changes: 11 additions & 0 deletions examples/docs_snippets/docs_snippets/deploying/job_retries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dagster import job


@job(tags={"dagster/max_retries": 3})
def sample_job():
pass


@job(tags={"dagster/max_retries": 3, "dagster/retry_strategy": "ALL_STEPS"})
def other_sample_sample_job():
pass
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"run_coordinator",
"run_launcher",
"run_monitoring",
"run_retries",
"storage",
"telemetry",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os

from dagster import DagsterInstance
from dagster.core.execution.plan.resume_retry import ReexecutionStrategy
from dagster.core.instance.ref import InstanceRef
from dagster.core.storage.tags import MAX_RETRIES_TAG, RETRY_STRATEGY_TAG
from docs_snippets.deploying.job_retries import other_sample_sample_job, sample_job


def test_tags():
assert sample_job.tags[MAX_RETRIES_TAG] == "3"
assert other_sample_sample_job.tags[MAX_RETRIES_TAG] == "3"
assert (
other_sample_sample_job.tags[RETRY_STRATEGY_TAG]
== ReexecutionStrategy.ALL_STEPS.value
)


def test_instance(docs_snippets_folder):
ref = InstanceRef.from_dir(
os.path.join(docs_snippets_folder, "deploying", "dagster_instance")
)

assert ref.settings["run_retries"]["enabled"] == True
assert ref.settings["run_retries"]["max_retries"] == 3

0 comments on commit 14696ae

Please sign in to comment.