diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-k8s.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-k8s.rst index feb1d65f0809f..1ae084228ecf6 100644 --- a/docs/sphinx/sections/api/apidocs/libraries/dagster-k8s.rst +++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-k8s.rst @@ -17,6 +17,10 @@ APIs .. autoconfigurable:: k8s_job_executor :annotation: ExecutorDefinition +Ops +=== + +.. autoconfigurable:: k8s_job_op Python API ^^^^^^^^^^ diff --git a/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py b/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py index 93689cd989b94..4c7f214695c57 100644 --- a/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py +++ b/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py @@ -1,4 +1,4 @@ -# pylint: disable=print-call, redefined-outer-name +# pylint: disable=print-call, redefined-outer-name, unused-argument import base64 import json import os @@ -69,7 +69,7 @@ def _create_namespace(should_cleanup, existing_helm_namespace=None, prefix="dags @pytest.fixture(scope="session") -def namespace(pytestconfig, should_cleanup): +def namespace(cluster_provider, pytestconfig, should_cleanup): """If an existing Helm chart namespace is specified via pytest CLI with the argument --existing-helm-namespace, we will use that chart. @@ -84,7 +84,7 @@ def namespace(pytestconfig, should_cleanup): @pytest.fixture(scope="session") -def run_monitoring_namespace(pytestconfig, should_cleanup): +def run_monitoring_namespace(cluster_provider, pytestconfig, should_cleanup): """If an existing Helm chart namespace is specified via pytest CLI with the argument --existing-helm-namespace, we will use that chart. diff --git a/integration_tests/test_suites/k8s-test-suite/conftest.py b/integration_tests/test_suites/k8s-test-suite/conftest.py index 23894c01a053a..339bd5d8ef1f4 100644 --- a/integration_tests/test_suites/k8s-test-suite/conftest.py +++ b/integration_tests/test_suites/k8s-test-suite/conftest.py @@ -37,7 +37,11 @@ def dagster_home(): cluster_provider = define_cluster_provider_fixture( - additional_kind_images=["docker.io/bitnami/rabbitmq", "docker.io/bitnami/postgresql"] + additional_kind_images=[ + "docker.io/busybox", + "docker.io/bitnami/rabbitmq", + "docker.io/bitnami/postgresql", + ] ) diff --git a/integration_tests/test_suites/k8s-test-suite/tests/test_k8s_job_op.py b/integration_tests/test_suites/k8s-test-suite/tests/test_k8s_job_op.py new file mode 100644 index 0000000000000..8e87639e629ca --- /dev/null +++ b/integration_tests/test_suites/k8s-test-suite/tests/test_k8s_job_op.py @@ -0,0 +1,83 @@ +import pytest +from dagster_k8s import k8s_job_op +from dagster_k8s.client import DagsterK8sError + +from dagster import job + + +@pytest.mark.default +def test_k8s_job_op(namespace, cluster_provider): + first_op = k8s_job_op.configured( + { + "image": "busybox", + "command": ["/bin/sh", "-c"], + "args": ["echo HI"], + "namespace": namespace, + "load_incluster_config": False, + "kubeconfig_file": cluster_provider.kubeconfig_file, + }, + name="first_op", + ) + second_op = k8s_job_op.configured( + { + "image": "busybox", + "command": ["/bin/sh", "-c"], + "args": ["echo GOODBYE"], + "namespace": namespace, + "load_incluster_config": False, + "kubeconfig_file": cluster_provider.kubeconfig_file, + }, + name="second_op", + ) + + @job + def my_full_job(): + second_op(first_op()) + + my_full_job.execute_in_process() + + +@pytest.mark.default +def test_k8s_job_op_with_timeout(namespace, cluster_provider): + timeout_op = k8s_job_op.configured( + { + "image": "busybox", + "command": ["/bin/sh", "-c"], + "args": ["sleep 15 && echo HI"], + "namespace": namespace, + "load_incluster_config": False, + "kubeconfig_file": cluster_provider.kubeconfig_file, + "timeout": 5, + }, + name="timeout_op", + ) + + @job + def timeout_job(): + timeout_op() + + with pytest.raises(DagsterK8sError, match="Timed out while waiting for pod to become ready"): + timeout_job.execute_in_process() + + +@pytest.mark.default +def test_k8s_job_op_with_failure(namespace, cluster_provider): + failure_op = k8s_job_op.configured( + { + "image": "busybox", + "command": ["/bin/sh", "-c"], + "args": ["sleep 10 && exit 1"], + "namespace": namespace, + "load_incluster_config": False, + "kubeconfig_file": cluster_provider.kubeconfig_file, + "timeout": 5, + }, + name="failure_op", + ) + + @job + def failure_job(): + failure_op() + + with pytest.raises(DagsterK8sError, match="Timed out while waiting for pod to become ready"): + failure_job.execute_in_process() diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/__init__.py b/python_modules/libraries/dagster-k8s/dagster_k8s/__init__.py index 4a68f5c73dc48..95d6428ec610c 100644 --- a/python_modules/libraries/dagster-k8s/dagster_k8s/__init__.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/__init__.py @@ -3,6 +3,7 @@ from .executor import k8s_job_executor from .job import DagsterK8sJobConfig, construct_dagster_k8s_job from .launcher import K8sRunLauncher +from .ops.k8s_job_op import k8s_job_op from .version import __version__ check_dagster_package_version("dagster-k8s", __version__) diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/client.py b/python_modules/libraries/dagster-k8s/dagster_k8s/client.py index d1487b078dd6c..f993e02801db3 100644 --- a/python_modules/libraries/dagster-k8s/dagster_k8s/client.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/client.py @@ -160,7 +160,7 @@ def wait_for_job( job_name (str): Name of the job to wait for. namespace (str): Namespace in which the job is located. wait_timeout (numeric, optional): Timeout after which to give up and raise exception. - Defaults to DEFAULT_WAIT_TIMEOUT. + Defaults to DEFAULT_WAIT_TIMEOUT. Set to 0 to disable. wait_time_between_attempts (numeric, optional): Wait time between polling attempts. Defaults to DEFAULT_WAIT_BETWEEN_ATTEMPTS. @@ -176,7 +176,7 @@ def wait_for_job( start = start_time or self.timer() while not job: - if self.timer() - start > wait_timeout: + if wait_timeout and (self.timer() - start > wait_timeout): raise DagsterK8sTimeoutError( "Timed out while waiting for job {job_name}" " to launch".format(job_name=job_name) @@ -222,7 +222,7 @@ def wait_for_job_success( job_name (str): Name of the job to wait for. namespace (str): Namespace in which the job is located. wait_timeout (numeric, optional): Timeout after which to give up and raise exception. - Defaults to DEFAULT_WAIT_TIMEOUT. + Defaults to DEFAULT_WAIT_TIMEOUT. Set to 0 to disable. wait_time_between_attempts (numeric, optional): Wait time between polling attempts. Defaults to DEFAULT_WAIT_BETWEEN_ATTEMPTS. @@ -248,10 +248,32 @@ def wait_for_job_success( start_time=start, ) + self.wait_for_running_job_to_succeed( + job_name, + namespace, + instance, + run_id, + wait_timeout, + wait_time_between_attempts, + num_pods_to_wait_for, + start=start, + ) + + def wait_for_running_job_to_succeed( + self, + job_name, + namespace, + instance=None, + run_id=None, + wait_timeout=DEFAULT_WAIT_TIMEOUT, + wait_time_between_attempts=DEFAULT_WAIT_BETWEEN_ATTEMPTS, + num_pods_to_wait_for=DEFAULT_JOB_POD_COUNT, + start=None, + ): # Wait for the job status to be completed. We check the status every # wait_time_between_attempts seconds while True: - if self.timer() - start > wait_timeout: + if wait_timeout and (self.timer() - start > wait_timeout): raise DagsterK8sTimeoutError( "Timed out while waiting for job {job_name}" " to complete".format(job_name=job_name) @@ -393,7 +415,7 @@ def wait_for_pod( wait_for_state (WaitForPodState, optional): Whether to wait for pod readiness or termination. Defaults to waiting for readiness. wait_timeout (numeric, optional): Timeout after which to give up and raise exception. - Defaults to DEFAULT_WAIT_TIMEOUT. + Defaults to DEFAULT_WAIT_TIMEOUT. Set to 0 to disable. wait_time_between_attempts (numeric, optional): Wait time between polling attempts. Defaults to DEFAULT_WAIT_BETWEEN_ATTEMPTS. @@ -417,7 +439,7 @@ def wait_for_pod( ).items pod = pods[0] if pods else None - if self.timer() - start > wait_timeout: + if wait_timeout and self.timer() - start > wait_timeout: raise DagsterK8sError( "Timed out while waiting for pod to become ready with pod info: %s" % str(pod) ) diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/job.py b/python_modules/libraries/dagster-k8s/dagster_k8s/job.py index 6a79d21967614..e549d25d74ac0 100644 --- a/python_modules/libraries/dagster-k8s/dagster_k8s/job.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/job.py @@ -279,15 +279,13 @@ def __new__( return super(DagsterK8sJobConfig, cls).__new__( cls, job_image=check.opt_str_param(job_image, "job_image"), - dagster_home=check.opt_str_param( - dagster_home, "dagster_home", default=DAGSTER_HOME_DEFAULT - ), + dagster_home=check.opt_str_param(dagster_home, "dagster_home"), image_pull_policy=check.opt_str_param(image_pull_policy, "image_pull_policy", "Always"), image_pull_secrets=check.opt_list_param( image_pull_secrets, "image_pull_secrets", of_type=dict ), service_account_name=check.opt_str_param(service_account_name, "service_account_name"), - instance_config_map=check.str_param(instance_config_map, "instance_config_map"), + instance_config_map=check.opt_str_param(instance_config_map, "instance_config_map"), postgres_password_secret=check.opt_str_param( postgres_password_secret, "postgres_password_secret" ), @@ -532,6 +530,7 @@ def construct_dagster_k8s_job( component=None, labels=None, env_vars=None, + command=None, ): """Constructs a Kubernetes Job object for a dagster-graphql invocation. @@ -593,7 +592,11 @@ def construct_dagster_k8s_job( additional_labels = {k: sanitize_k8s_label(v) for k, v in (labels or {}).items()} dagster_labels = merge_dicts(k8s_common_labels, additional_labels) - env = [{"name": "DAGSTER_HOME", "value": job_config.dagster_home}] + env = ( + [{"name": "DAGSTER_HOME", "value": job_config.dagster_home}] + if job_config.dagster_home + else [] + ) if job_config.postgres_password_secret: env.append( { @@ -638,6 +641,7 @@ def construct_dagster_k8s_job( { "name": "dagster", "image": job_image, + "command": command, "args": args, "image_pull_policy": job_config.image_pull_policy, "env": env + job_config.env + additional_k8s_env_vars, diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/models.py b/python_modules/libraries/dagster-k8s/dagster_k8s/models.py index 6183b0ce50be0..af24354f453c2 100644 --- a/python_modules/libraries/dagster-k8s/dagster_k8s/models.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/models.py @@ -35,6 +35,9 @@ def _get_openapi_dict_value_type(attr_type): def _k8s_parse_value(data, classname, attr_name): + if data == None: + return None + if _is_openapi_list_type(classname): sub_kls = _get_openapi_list_element_type(classname) return [ diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/ops/__init__.py b/python_modules/libraries/dagster-k8s/dagster_k8s/ops/__init__.py new file mode 100644 index 0000000000000..fc97427843b46 --- /dev/null +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/ops/__init__.py @@ -0,0 +1 @@ +from .k8s_job_op import k8s_job_op diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/ops/k8s_job_op.py b/python_modules/libraries/dagster-k8s/dagster_k8s/ops/k8s_job_op.py new file mode 100644 index 0000000000000..fa75446901da8 --- /dev/null +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/ops/k8s_job_op.py @@ -0,0 +1,215 @@ +import time + +import kubernetes + +from dagster import Field, In, Noneable, Nothing, Permissive, StringSource, op +from dagster.utils import merge_dicts +from dagster.utils.backcompat import experimental + +from ..container_context import K8sContainerContext +from ..job import ( + DagsterK8sJobConfig, + UserDefinedDagsterK8sConfig, + construct_dagster_k8s_job, + get_k8s_job_name, +) +from ..launcher import K8sRunLauncher +from ..utils import ( + get_pod_names_in_job, + wait_for_job, + wait_for_pod, + wait_for_running_job_to_succeed, +) + + +@op( + ins={"start_after": In(Nothing)}, + config_schema=merge_dicts( + DagsterK8sJobConfig.config_type_container(), + { + "image": Field( + StringSource, + is_required=True, + description="The image in which to launch the k8s job.", + ), + "command": Field( + [str], + is_required=False, + description="The command to run in the container within the launched k8s job.", + ), + "args": Field( + [str], + is_required=False, + description="The args for the command for the container.", + ), + "namespace": Field(StringSource, is_required=False), + "load_incluster_config": Field( + bool, + is_required=False, + default_value=True, + description="""Set this value if you are running the launcher + within a k8s cluster. If ``True``, we assume the launcher is running within the target + cluster and load config using ``kubernetes.config.load_incluster_config``. Otherwise, + we will use the k8s config specified in ``kubeconfig_file`` (using + ``kubernetes.config.load_kube_config``) or fall back to the default kubeconfig.""", + ), + "kubeconfig_file": Field( + Noneable(str), + is_required=False, + default_value=None, + description="The kubeconfig file from which to load config. Defaults to using the default kubeconfig.", + ), + "job_config": Field( + Permissive(), + is_required=False, + description="Raw k8s config for the v1Job. Keys can either snake_case or camelCase", + ), + "timeout": Field( + int, + is_required=False, + description="How long to wait for the job to succeed before raising an exception", + ), + }, + ), +) +@experimental +def k8s_job_op(context): + """ + An op that runs a Kubernetes job using the k8s API. + + Contrast with the `k8s_job_executor`, which runs each Dagster op in a Dagster job in its + own k8s job. + + This op may be useful when: + - You need to orchestrate a command that isn't a Dagster op (or isn't written in Python) + - You want to run the rest of a Dagster job using a specific executor, and only a single + op in k8s. + + For example: + + .. literalinclude:: ../../../../../../python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_example_k8s_job_op.py + :start-after: start_marker + :end-before: end_marker + :language: python + + + """ + + config = context.op_config + + run_container_context = K8sContainerContext.create_for_run( + context.pipeline_run, + context.instance.run_launcher + if isinstance(context.instance.run_launcher, K8sRunLauncher) + else None, + ) + + op_container_context = K8sContainerContext( + image_pull_policy=config.get("image_pull_policy"), # type: ignore + image_pull_secrets=config.get("image_pull_secrets"), # type: ignore + service_account_name=config.get("service_account_name"), # type: ignore + env_config_maps=config.get("env_config_maps"), # type: ignore + env_secrets=config.get("env_secrets"), # type: ignore + env_vars=config.get("env_vars"), # type: ignore + volume_mounts=config.get("volume_mounts"), # type: ignore + volumes=config.get("volumes"), # type: ignore + labels=config.get("labels"), # type: ignore + namespace=config.get("namespace"), # type: ignore + resources=config.get("resources"), # type: ignore + ) + + container_context = run_container_context.merge(op_container_context) + + namespace = container_context.namespace + + user_defined_k8s_config = UserDefinedDagsterK8sConfig( + job_config=config.get("job_config"), + ) + + k8s_job_config = DagsterK8sJobConfig( + job_image=config["image"], + dagster_home=None, + image_pull_policy=container_context.image_pull_policy, + image_pull_secrets=container_context.image_pull_secrets, + service_account_name=container_context.service_account_name, + instance_config_map=None, + postgres_password_secret=None, + env_config_maps=container_context.env_config_maps, + env_secrets=container_context.env_secrets, + env_vars=container_context.env_vars, + volume_mounts=container_context.volume_mounts, + volumes=container_context.volumes, + labels=container_context.labels, + resources=container_context.resources, + ) + + job_name = get_k8s_job_name(context.run_id, context.op.name) + + job = construct_dagster_k8s_job( + job_config=k8s_job_config, + command=config.get("command"), + args=config.get("args"), + job_name=job_name, + pod_name=job_name, + component="k8s_job_op", + user_defined_k8s_config=user_defined_k8s_config, + labels={ + "dagster/job": context.pipeline_run.pipeline_name, + "dagster/op": context.op.name, + "dagster/run-id": context.pipeline_run.run_id, + }, + ) + + if config["load_incluster_config"]: + kubernetes.config.load_incluster_config() + else: + kubernetes.config.load_kube_config(config.get("kubeconfig_file")) + + context.log.info(f"Creating Kubernetes job {job_name} in namespace {namespace}...") + + start_time = time.time() + + kubernetes.client.BatchV1Api().create_namespaced_job(namespace, job) + + core_api = kubernetes.client.CoreV1Api() + + context.log.info("Waiting for Kubernetes job to finish...") + + timeout = config.get("timeout", 0) + + wait_for_job( + job_name=job_name, + namespace=namespace, + wait_timeout=config.get("timeout", 0), + ) + + pod_names = get_pod_names_in_job(job_name, namespace=namespace) + + if not pod_names: + raise Exception("No pod names in job after it started") + + pod_to_watch = pod_names[0] + watch = kubernetes.watch.Watch() + + wait_for_pod(pod_to_watch, namespace, wait_timeout=timeout) + + log_stream = watch.stream( + core_api.read_namespaced_pod_log, name=pod_to_watch, namespace=namespace + ) + + while True: + if timeout and time.time() - start_time > timeout: + watch.stop() + raise Exception("Timed out waiting for pod to finish") + + try: + log_entry = next(log_stream) + print(log_entry) # pylint: disable=print-call + except StopIteration: + break + + wait_for_running_job_to_succeed( + job_name=job_name, + namespace=namespace, + wait_timeout=config.get("timeout", 0), + ) diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/utils.py b/python_modules/libraries/dagster-k8s/dagster_k8s/utils.py index 0c7b373382d1b..65ded2b10a352 100644 --- a/python_modules/libraries/dagster-k8s/dagster_k8s/utils.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/utils.py @@ -66,6 +66,26 @@ def wait_for_job( ) +def wait_for_running_job_to_succeed( + job_name, + namespace, + instance=None, + run_id=None, + wait_timeout=DEFAULT_WAIT_TIMEOUT, + wait_time_between_attempts=DEFAULT_WAIT_BETWEEN_ATTEMPTS, + num_pods_to_wait_for=DEFAULT_JOB_POD_COUNT, +): + return DagsterKubernetesClient.production_client().wait_for_running_job_to_succeed( + job_name, + namespace, + instance, + run_id, + wait_timeout, + wait_time_between_attempts, + num_pods_to_wait_for, + ) + + def wait_for_pod( pod_name, namespace, diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_example_k8s_job_op.py b/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_example_k8s_job_op.py new file mode 100644 index 0000000000000..bc0d4f3c0e90a --- /dev/null +++ b/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_example_k8s_job_op.py @@ -0,0 +1,32 @@ +# isort: skip_file +# fmt: off +# start_marker +from dagster import job +from dagster_k8s import k8s_job_op + +first_op = k8s_job_op.configured( + { + "image": "busybox", + "command": ["/bin/sh", "-c"], + "args": ["echo HELLO"], + }, + name="first_op", +) +second_op = k8s_job_op.configured( + { + "image": "busybox", + "command": ["/bin/sh", "-c"], + "args": ["echo GOODBYE"], + }, + name="second_op", +) + +@job +def full_job(): + second_op(first_op()) +# end_marker +# fmt: on + + +def test_k8s_job_op(): + assert full_job