Skip to content

Commit

Permalink
Rename step jobs 'dagster-step' in k8s and docker (#6982)
Browse files Browse the repository at this point in the history
Post JOG, `dagster-job` is a confusing name for step jobs.
  • Loading branch information
johannkm committed Mar 14, 2022
1 parent 1327a6d commit a88e658
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ If you do not use a Kubernetes distribution that supports the [TTL Controller](h

Delete dagster [Jobs](https://kubernetes.io/docs/concepts/workloads/controllers/job/) older than one day

kubectl get job | grep -e dagster-run -e dagster-job | awk 'match($4,/[0-9]+d/) {print $1}' | xargs kubectl delete job
kubectl get job | grep -e dagster-run -e dagster-step | awk 'match($4,/[0-9]+d/) {print $1}' | xargs kubectl delete job

Delete completed [Pods](https://kubernetes.io/docs/concepts/workloads/pods/) older than one day

kubectl get pod | grep -e dagster-run -e dagster-job | awk 'match($3,/Completed/) {print $0}' | awk 'match($5,/[0-9]+d/) {print $1}' | xargs kubectl delete pod
kubectl get pod | grep -e dagster-run -e dagster-step | awk 'match($3,/Completed/) {print $0}' | awk 'match($5,/[0-9]+d/) {print $1}' | xargs kubectl delete pod

## Conclusion

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ The Helm chart can be configured to use this architecture by configuring the `ru

Users can configure multiple Celery queues (for example, one celery queue for each resource the user would like to limit) and multiple Celery workers per queue via the `runLauncher.config.celeryK8sRunLauncher.workerQueues` section of `values.yaml`.

The Celery workers poll for new Celery tasks and execute each task in order of receipt or priority. The Celery task largely consists of launching an ephemeral Kubernetes step job to execute that step.
The Celery workers poll for new Celery tasks and execute each task in order of receipt or priority. The Celery task largely consists of launching an ephemeral Kubernetes step worker to execute that step.

### Daemon

Expand All @@ -173,9 +173,9 @@ The run worker is still responsible for traversing the execution plan, but now u

All jobs being executed on an instance that uses the `CeleryK8sRunLauncher` must have the `celery_k8s_job_executor` set in the `executor_def` field.

### Step Job
### Step Worker

The step job is responsible for executing a single step, writing the structured events to the database. The Celery worker polls for the step job completion.
The step worker is responsible for executing a single step, writing the structured events to the database. The Celery worker polls for the step worker completion.

### Flower

Expand Down Expand Up @@ -225,11 +225,11 @@ You can introspect the jobs that were launched with `kubectl`:

$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
dagster-job-9f5c92d1216f636e0d33877560818840 1/1 5s 12s
dagster-job-a1063317b9aac91f42ca9eacec551b6f 1/1 12s 34s
dagster-step-9f5c92d1216f636e0d33877560818840 1/1 5s 12s
dagster-step-a1063317b9aac91f42ca9eacec551b6f 1/1 12s 34s
dagster-run-fb6822e5-bf43-476f-9e6c-6f9896cf3fb8 1/1 37s 37s

`dagster-job-` entries correspond to step jobs and `dagster-run-` entries correspond to run workers.
`dagster-step-` entries correspond to step workers and `dagster-run-` entries correspond to run workers.

Within Dagit, you can watch the job as it executes.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def test_k8s_executor_combine_configs(
)

step_job_key = get_k8s_job_name(run_id, "count_letters")
step_job_name = f"dagster-job-{step_job_key}"
step_job_name = f"dagster-step-{step_job_key}"

step_pods = get_pods_in_job(
job_name=step_job_name, namespace=helm_namespace_for_k8s_run_launcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,11 @@ def _execute_step_k8s_job(

if retry_state.get_attempt_count(step_key):
attempt_number = retry_state.get_attempt_count(step_key)
job_name = "dagster-job-%s-%d" % (k8s_name_key, attempt_number)
pod_name = "dagster-job-%s-%d" % (k8s_name_key, attempt_number)
job_name = "dagster-step-%s-%d" % (k8s_name_key, attempt_number)
pod_name = "dagster-step-%s-%d" % (k8s_name_key, attempt_number)
else:
job_name = "dagster-job-%s" % (k8s_name_key)
pod_name = "dagster-job-%s" % (k8s_name_key)
job_name = "dagster-step-%s" % (k8s_name_key)
pod_name = "dagster-step-%s" % (k8s_name_key)

args = execute_step_args.get_command_args()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def _get_client(self):
return client

def _get_container_name(self, run_id, step_key):
return f"dagster-job-{hash_str(run_id + step_key)}"
return f"dagster-step-{hash_str(run_id + step_key)}"

def _create_step_container(self, client, step_image, execute_step_args):
return client.containers.create(
Expand Down
4 changes: 2 additions & 2 deletions python_modules/libraries/dagster-k8s/dagster_k8s/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ def _get_k8s_step_job_name(self, step_handler_context):
if step_handler_context.execute_step_args.known_state:
retry_state = step_handler_context.execute_step_args.known_state.get_retry_state()
if retry_state.get_attempt_count(step_key):
return "dagster-job-%s-%d" % (name_key, retry_state.get_attempt_count(step_key))
return "dagster-step-%s-%d" % (name_key, retry_state.get_attempt_count(step_key))

return "dagster-job-%s" % (name_key)
return "dagster-step-%s" % (name_key)

def launch_step(self, step_handler_context: StepHandlerContext):
events = []
Expand Down
2 changes: 1 addition & 1 deletion python_modules/libraries/dagster-k8s/dagster_k8s/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from .models import k8s_model_from_dict, k8s_snake_case_dict
from .utils import sanitize_k8s_label

# To retry step job, users should raise RetryRequested() so that the dagster system is aware of the
# To retry step worker, users should raise RetryRequested() so that the dagster system is aware of the
# retry. As an example, see retry_pipeline in dagster_test.test_project.test_pipelines.repo
# To override this config, user can specify UserDefinedDagsterK8sConfig.
DEFAULT_K8S_JOB_BACKOFF_LIMIT = 0
Expand Down

1 comment on commit a88e658

@vercel
Copy link

@vercel vercel bot commented on a88e658 Mar 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.