Skip to content

Commit

Permalink
Op that runs a kubernetes job (#8161)
Browse files Browse the repository at this point in the history
* WIP: Op that runs a kubernetes job

Summary:
This introduces a more direct analogue to a 'pod that manages a k8s container' - to handle cases where you want to stitch together a mixture of ops, or orchestrate non-Python languages.

A nice thing is that we can use container_context to still pass config through to ops like this (so you can set up a default set of secrets to include in all ops even when you're not launching the run in k8s, for example). And we can expose raw job_config on the op as well to give you access to the full k8s api for the job.

* Make wait_for_pod disable timeout on 0

Co-authored-by: Johann Miller <johann@elementl.com>
  • Loading branch information
2 people authored and prha committed Jul 6, 2022
1 parent bdca087 commit 823a5d8
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 15 deletions.
4 changes: 4 additions & 0 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-k8s.rst
Expand Up @@ -17,6 +17,10 @@ APIs
.. autoconfigurable:: k8s_job_executor
:annotation: ExecutorDefinition

Ops
===

.. autoconfigurable:: k8s_job_op

Python API
^^^^^^^^^^
Expand Down
@@ -1,4 +1,4 @@
# pylint: disable=print-call, redefined-outer-name
# pylint: disable=print-call, redefined-outer-name, unused-argument
import base64
import json
import os
Expand Down Expand Up @@ -69,7 +69,7 @@ def _create_namespace(should_cleanup, existing_helm_namespace=None, prefix="dags


@pytest.fixture(scope="session")
def namespace(pytestconfig, should_cleanup):
def namespace(cluster_provider, pytestconfig, should_cleanup):
"""If an existing Helm chart namespace is specified via pytest CLI with the argument
--existing-helm-namespace, we will use that chart.
Expand All @@ -84,7 +84,7 @@ def namespace(pytestconfig, should_cleanup):


@pytest.fixture(scope="session")
def run_monitoring_namespace(pytestconfig, should_cleanup):
def run_monitoring_namespace(cluster_provider, pytestconfig, should_cleanup):
"""If an existing Helm chart namespace is specified via pytest CLI with the argument
--existing-helm-namespace, we will use that chart.
Expand Down
6 changes: 5 additions & 1 deletion integration_tests/test_suites/k8s-test-suite/conftest.py
Expand Up @@ -37,7 +37,11 @@ def dagster_home():


cluster_provider = define_cluster_provider_fixture(
additional_kind_images=["docker.io/bitnami/rabbitmq", "docker.io/bitnami/postgresql"]
additional_kind_images=[
"docker.io/busybox",
"docker.io/bitnami/rabbitmq",
"docker.io/bitnami/postgresql",
]
)


Expand Down
@@ -0,0 +1,83 @@
import pytest
from dagster_k8s import k8s_job_op
from dagster_k8s.client import DagsterK8sError

from dagster import job


@pytest.mark.default
def test_k8s_job_op(namespace, cluster_provider):
first_op = k8s_job_op.configured(
{
"image": "busybox",
"command": ["/bin/sh", "-c"],
"args": ["echo HI"],
"namespace": namespace,
"load_incluster_config": False,
"kubeconfig_file": cluster_provider.kubeconfig_file,
},
name="first_op",
)
second_op = k8s_job_op.configured(
{
"image": "busybox",
"command": ["/bin/sh", "-c"],
"args": ["echo GOODBYE"],
"namespace": namespace,
"load_incluster_config": False,
"kubeconfig_file": cluster_provider.kubeconfig_file,
},
name="second_op",
)

@job
def my_full_job():
second_op(first_op())

my_full_job.execute_in_process()


@pytest.mark.default
def test_k8s_job_op_with_timeout(namespace, cluster_provider):
timeout_op = k8s_job_op.configured(
{
"image": "busybox",
"command": ["/bin/sh", "-c"],
"args": ["sleep 15 && echo HI"],
"namespace": namespace,
"load_incluster_config": False,
"kubeconfig_file": cluster_provider.kubeconfig_file,
"timeout": 5,
},
name="timeout_op",
)

@job
def timeout_job():
timeout_op()

with pytest.raises(DagsterK8sError, match="Timed out while waiting for pod to become ready"):
timeout_job.execute_in_process()


@pytest.mark.default
def test_k8s_job_op_with_failure(namespace, cluster_provider):
failure_op = k8s_job_op.configured(
{
"image": "busybox",
"command": ["/bin/sh", "-c"],
"args": ["sleep 10 && exit 1"],
"namespace": namespace,
"load_incluster_config": False,
"kubeconfig_file": cluster_provider.kubeconfig_file,
"timeout": 5,
},
name="failure_op",
)

@job
def failure_job():
failure_op()

with pytest.raises(DagsterK8sError, match="Timed out while waiting for pod to become ready"):
failure_job.execute_in_process()
Expand Up @@ -3,6 +3,7 @@
from .executor import k8s_job_executor
from .job import DagsterK8sJobConfig, construct_dagster_k8s_job
from .launcher import K8sRunLauncher
from .ops.k8s_job_op import k8s_job_op
from .version import __version__

check_dagster_package_version("dagster-k8s", __version__)
Expand Down
34 changes: 28 additions & 6 deletions python_modules/libraries/dagster-k8s/dagster_k8s/client.py
Expand Up @@ -160,7 +160,7 @@ def wait_for_job(
job_name (str): Name of the job to wait for.
namespace (str): Namespace in which the job is located.
wait_timeout (numeric, optional): Timeout after which to give up and raise exception.
Defaults to DEFAULT_WAIT_TIMEOUT.
Defaults to DEFAULT_WAIT_TIMEOUT. Set to 0 to disable.
wait_time_between_attempts (numeric, optional): Wait time between polling attempts. Defaults
to DEFAULT_WAIT_BETWEEN_ATTEMPTS.
Expand All @@ -176,7 +176,7 @@ def wait_for_job(
start = start_time or self.timer()

while not job:
if self.timer() - start > wait_timeout:
if wait_timeout and (self.timer() - start > wait_timeout):
raise DagsterK8sTimeoutError(
"Timed out while waiting for job {job_name}"
" to launch".format(job_name=job_name)
Expand Down Expand Up @@ -222,7 +222,7 @@ def wait_for_job_success(
job_name (str): Name of the job to wait for.
namespace (str): Namespace in which the job is located.
wait_timeout (numeric, optional): Timeout after which to give up and raise exception.
Defaults to DEFAULT_WAIT_TIMEOUT.
Defaults to DEFAULT_WAIT_TIMEOUT. Set to 0 to disable.
wait_time_between_attempts (numeric, optional): Wait time between polling attempts. Defaults
to DEFAULT_WAIT_BETWEEN_ATTEMPTS.
Expand All @@ -248,10 +248,32 @@ def wait_for_job_success(
start_time=start,
)

self.wait_for_running_job_to_succeed(
job_name,
namespace,
instance,
run_id,
wait_timeout,
wait_time_between_attempts,
num_pods_to_wait_for,
start=start,
)

def wait_for_running_job_to_succeed(
self,
job_name,
namespace,
instance=None,
run_id=None,
wait_timeout=DEFAULT_WAIT_TIMEOUT,
wait_time_between_attempts=DEFAULT_WAIT_BETWEEN_ATTEMPTS,
num_pods_to_wait_for=DEFAULT_JOB_POD_COUNT,
start=None,
):
# Wait for the job status to be completed. We check the status every
# wait_time_between_attempts seconds
while True:
if self.timer() - start > wait_timeout:
if wait_timeout and (self.timer() - start > wait_timeout):
raise DagsterK8sTimeoutError(
"Timed out while waiting for job {job_name}"
" to complete".format(job_name=job_name)
Expand Down Expand Up @@ -393,7 +415,7 @@ def wait_for_pod(
wait_for_state (WaitForPodState, optional): Whether to wait for pod readiness or
termination. Defaults to waiting for readiness.
wait_timeout (numeric, optional): Timeout after which to give up and raise exception.
Defaults to DEFAULT_WAIT_TIMEOUT.
Defaults to DEFAULT_WAIT_TIMEOUT. Set to 0 to disable.
wait_time_between_attempts (numeric, optional): Wait time between polling attempts. Defaults
to DEFAULT_WAIT_BETWEEN_ATTEMPTS.
Expand All @@ -417,7 +439,7 @@ def wait_for_pod(
).items
pod = pods[0] if pods else None

if self.timer() - start > wait_timeout:
if wait_timeout and self.timer() - start > wait_timeout:
raise DagsterK8sError(
"Timed out while waiting for pod to become ready with pod info: %s" % str(pod)
)
Expand Down
14 changes: 9 additions & 5 deletions python_modules/libraries/dagster-k8s/dagster_k8s/job.py
Expand Up @@ -279,15 +279,13 @@ def __new__(
return super(DagsterK8sJobConfig, cls).__new__(
cls,
job_image=check.opt_str_param(job_image, "job_image"),
dagster_home=check.opt_str_param(
dagster_home, "dagster_home", default=DAGSTER_HOME_DEFAULT
),
dagster_home=check.opt_str_param(dagster_home, "dagster_home"),
image_pull_policy=check.opt_str_param(image_pull_policy, "image_pull_policy", "Always"),
image_pull_secrets=check.opt_list_param(
image_pull_secrets, "image_pull_secrets", of_type=dict
),
service_account_name=check.opt_str_param(service_account_name, "service_account_name"),
instance_config_map=check.str_param(instance_config_map, "instance_config_map"),
instance_config_map=check.opt_str_param(instance_config_map, "instance_config_map"),
postgres_password_secret=check.opt_str_param(
postgres_password_secret, "postgres_password_secret"
),
Expand Down Expand Up @@ -532,6 +530,7 @@ def construct_dagster_k8s_job(
component=None,
labels=None,
env_vars=None,
command=None,
):
"""Constructs a Kubernetes Job object for a dagster-graphql invocation.
Expand Down Expand Up @@ -593,7 +592,11 @@ def construct_dagster_k8s_job(
additional_labels = {k: sanitize_k8s_label(v) for k, v in (labels or {}).items()}
dagster_labels = merge_dicts(k8s_common_labels, additional_labels)

env = [{"name": "DAGSTER_HOME", "value": job_config.dagster_home}]
env = (
[{"name": "DAGSTER_HOME", "value": job_config.dagster_home}]
if job_config.dagster_home
else []
)
if job_config.postgres_password_secret:
env.append(
{
Expand Down Expand Up @@ -638,6 +641,7 @@ def construct_dagster_k8s_job(
{
"name": "dagster",
"image": job_image,
"command": command,
"args": args,
"image_pull_policy": job_config.image_pull_policy,
"env": env + job_config.env + additional_k8s_env_vars,
Expand Down
3 changes: 3 additions & 0 deletions python_modules/libraries/dagster-k8s/dagster_k8s/models.py
Expand Up @@ -35,6 +35,9 @@ def _get_openapi_dict_value_type(attr_type):


def _k8s_parse_value(data, classname, attr_name):
if data == None:
return None

if _is_openapi_list_type(classname):
sub_kls = _get_openapi_list_element_type(classname)
return [
Expand Down
@@ -0,0 +1 @@
from .k8s_job_op import k8s_job_op

0 comments on commit 823a5d8

Please sign in to comment.