Skip to content

Commit

Permalink
K8s check run health test (#7070)
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Mar 15, 2022
1 parent 75b48ea commit ee146ef
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 3 deletions.
4 changes: 3 additions & 1 deletion docs/content/concepts/ops-jobs-graphs/ops.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ An op only starts to execute once all of its inputs have been resolved. Inputs c
You can use a [Dagster Type](/concepts/types) to provide a function that validates an op's input every time the op runs. In this case, you use a dictionary of <PyObject object="In" pluralize /> corresponding to the decorated function arguments.

```python file=/concepts/solids_pipelines/solids.py startafter=start_typed_input_op_marker endbefore=end_typed_input_op_marker
MyDagsterType = DagsterType(type_check_fn=lambda _, value: value % 2 == 0, name="MyDagsterType")
MyDagsterType = DagsterType(
type_check_fn=lambda _, value: value % 2 == 0, name="MyDagsterType"
)


@op(ins={"abc": In(dagster_type=MyDagsterType)})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ def my_input_op(abc, xyz):

# start_typed_input_op_marker

MyDagsterType = DagsterType(type_check_fn=lambda _, value: value % 2 == 0, name="MyDagsterType")
MyDagsterType = DagsterType(
type_check_fn=lambda _, value: value % 2 == 0, name="MyDagsterType"
)


@op(ins={"abc": In(dagster_type=MyDagsterType)})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
my_input_op,
my_multi_output_op,
my_op,
my_op_factory,
my_output_op,
my_typed_input_op,
my_op_factory,
)


Expand Down
2 changes: 2 additions & 0 deletions python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,6 @@ def check_run_worker_health(self, run: PipelineRun):
)
if job.status.failed:
return CheckRunHealthResult(WorkerStatus.FAILED, "K8s job failed")
if job.status.succeeded:
return CheckRunHealthResult(WorkerStatus.SUCCESS)
return CheckRunHealthResult(WorkerStatus.RUNNING)
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@

from dagster_k8s import K8sRunLauncher
from dagster_k8s.job import DAGSTER_PG_PASSWORD_ENV_VAR, UserDefinedDagsterK8sConfig
from kubernetes.client.models.v1_job import V1Job
from kubernetes.client.models.v1_job_status import V1JobStatus

from dagster import pipeline, reconstructable
from dagster.core.host_representation import RepositoryHandle
from dagster.core.launcher import LaunchRunContext
from dagster.core.launcher.base import WorkerStatus
from dagster.core.storage.tags import DOCKER_IMAGE_TAG
from dagster.core.test_utils import (
create_run_for_test,
Expand Down Expand Up @@ -262,3 +265,59 @@ def test_no_postgres(kubeconfig_file):
@pipeline
def fake_pipeline():
pass


def test_check_run_health(kubeconfig_file):

labels = {"foo_label_key": "bar_label_value"}

# Construct a K8s run launcher in a fake k8s environment.
mock_k8s_client_batch_api = mock.Mock(spec_set=["read_namespaced_job"])
mock_k8s_client_batch_api.read_namespaced_job.side_effect = [
V1Job(status=V1JobStatus(failed=0, succeeded=0)),
V1Job(status=V1JobStatus(failed=0, succeeded=1)),
V1Job(status=V1JobStatus(failed=1, succeeded=0)),
]
k8s_run_launcher = K8sRunLauncher(
service_account_name="dagit-admin",
instance_config_map="dagster-instance",
postgres_password_secret="dagster-postgresql-secret",
dagster_home="/opt/dagster/dagster_home",
job_image="fake_job_image",
load_incluster_config=False,
kubeconfig_file=kubeconfig_file,
k8s_client_batch_api=mock_k8s_client_batch_api,
labels=labels,
)

# Create fake external pipeline.
recon_pipeline = reconstructable(fake_pipeline)
recon_repo = recon_pipeline.repository
repo_def = recon_repo.get_definition()
with instance_for_test() as instance:
with in_process_test_workspace(instance, recon_repo) as workspace:
location = workspace.get_repository_location(workspace.repository_location_names[0])
repo_handle = RepositoryHandle(
repository_name=repo_def.name,
repository_location=location,
)
fake_external_pipeline = external_pipeline_from_recon_pipeline(
recon_pipeline,
solid_selection=None,
repository_handle=repo_handle,
)

# Launch the run in a fake Dagster instance.
pipeline_name = "demo_pipeline"
run = create_run_for_test(
instance,
pipeline_name=pipeline_name,
external_pipeline_origin=fake_external_pipeline.get_external_origin(),
pipeline_code_origin=fake_external_pipeline.get_python_origin(),
)
k8s_run_launcher.register_instance(instance)

# same order as side effects
assert k8s_run_launcher.check_run_worker_health(run).status == WorkerStatus.RUNNING
assert k8s_run_launcher.check_run_worker_health(run).status == WorkerStatus.SUCCESS
assert k8s_run_launcher.check_run_worker_health(run).status == WorkerStatus.FAILED

1 comment on commit ee146ef

@vercel
Copy link

@vercel vercel bot commented on ee146ef Mar 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.