Skip to content

Commit

Permalink
Include retry count in the job created by k8s_job_op (#11753)
Browse files Browse the repository at this point in the history
Summary:
Right now if you have an op with a retry policy that calls
execute_k8s_job, it will fail because it tries to create teh same job
twice. Include the retry count in the job name so that it doesn't run
into duplicate namespace issues.

Separately, we should change these cryptic names to be less cryptic (but
not as part of this PR).

Test Plan: BK

### Summary & Motivation

### How I Tested These Changes
  • Loading branch information
gibsondan committed Jan 18, 2023
1 parent a4b4295 commit 9f98587
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import kubernetes
import pytest
from dagster import job, op
from dagster import RetryRequested, job, op
from dagster_k8s import execute_k8s_job, k8s_job_op
from dagster_k8s.client import DagsterK8sError, DagsterKubernetesClient
from dagster_k8s.job import get_k8s_job_name
Expand Down Expand Up @@ -215,3 +215,31 @@ def with_config_job():
job_name = get_k8s_job_name(run_id, with_container_config.name)

assert "OVERRIDES_CONTAINER_CONFIG" in _get_pod_logs(cluster_provider, job_name, namespace)


@pytest.mark.default
def test_k8s_job_op_retries(namespace, cluster_provider):
@op
def fails_sometimes(context):
execute_k8s_job(
context,
image="busybox",
command=["/bin/sh", "-c"],
args=[f"echo HERE IS RETRY NUMBER {context.retry_number}"],
namespace=namespace,
load_incluster_config=False,
kubeconfig_file=cluster_provider.kubeconfig_file,
)
if context.retry_number == 0:
raise RetryRequested(max_retries=1, seconds_to_wait=1)

@job
def fails_sometimes_job():
fails_sometimes()

execute_result = fails_sometimes_job.execute_in_process()
run_id = execute_result.dagster_run.run_id
job_name = get_k8s_job_name(run_id, fails_sometimes.name)

assert "HERE IS RETRY NUMBER 0" in _get_pod_logs(cluster_provider, job_name, namespace)
assert "HERE IS RETRY NUMBER 1" in _get_pod_logs(cluster_provider, job_name + "-1", namespace)
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ def execute_k8s_job(

job_name = get_k8s_job_name(context.run_id, context.op.name)

retry_number = context.retry_number
if retry_number > 0:
job_name = f"{job_name}-{retry_number}"

job = construct_dagster_k8s_job(
job_config=k8s_job_config,
args=args,
Expand Down

0 comments on commit 9f98587

Please sign in to comment.