Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Op that runs a job in Kubernetes #6201

Closed
RMHogervorst opened this issue Jan 13, 2022 · 9 comments
Closed

Op that runs a job in Kubernetes #6201

RMHogervorst opened this issue Jan 13, 2022 · 9 comments
Assignees

Comments

@RMHogervorst
Copy link

Use Case

A preconfigured op that can create a job inside a kubernetes cluster.
(this issue is to collect requirements and expected interface for a k8s-job-step, a op or job that creates a kubernetes job with a custom image NOT containing dagster, but rather another application).

Ideas of Implementation

Additional Info


Message from the maintainers:

Excited about this feature? Give it a 👍. We factor engagement into prioritization.

@RMHogervorst
Copy link
Author

some overlap with #6034

@RMHogervorst
Copy link
Author

@AndreaGiardini s example op

@op(
    ins={
        "output_folder": In(str)
    }
)
def create_k8s_job(context, el, output_folder):

    from kubernetes import client, config

    if os.getenv('KUBERNETES_SERVICE_HOST'):
        # If I am running in Kubernetes, use the "in cluster" configuration
        config.load_incluster_config()
    else:
        config.load_kube_config()

    v1_batch = client.BatchV1Api()
    v1_core = client.CoreV1Api()
    # Create Job
    job = client.V1Job(
        api_version="batch/v1",
        kind="Job",
        metadata=client.V1ObjectMeta(generate_name="sen2cor-", namespace="dagster"),
        spec=client.V1JobSpec(
            ttl_seconds_after_finished=86400,
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(labels={"app": "sen2cor"}),
                spec=client.V1PodSpec(
                    containers=[
                        client.V1Container(
                            name="sen2cor",
                            image="dymaxionlabs/sen2cor",
                            args=[
                                el,
                                "--resolution",
                                "10",
                                "--output_dir",
                                output_folder
                            ],
                            volume_mounts=[
                                client.V1VolumeMount(
                                    name="gfs-data",
                                    mount_path="/gfs"
                                )
                            ]
                        )
                    ],
                    restart_policy="Never",
                    volumes=[
                        client.V1Volume(
                            name="gfs-data",
                            persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
                                claim_name="dagster-shared-volume"
                            )
                        )
                    ]
                ),
            )
        )
    )

    # Create Job
    api_response = v1_batch.create_namespaced_job("dagster", job)
    context.log.info("'%s' - Job created." % str(api_response.metadata.name))

    while True:
        job_status = v1_batch.read_namespaced_job_status(name=api_response.metadata.name, namespace="dagster")

        pod_list = v1_core.list_namespaced_pod(namespace="dagster", label_selector="app=sen2cor").items
        pod_list = [pod for pod in pod_list if pod.metadata.name.startswith(api_response.metadata.name)]
        for pod in pod_list:
            # Forward available logs to dagster
            try:
                for line in v1_core.read_namespaced_pod_log(name=pod.metadata.name, namespace='dagster', since_seconds=5).splitlines():
                    context.log.info(pod.metadata.name + " - " + line)
            except:
                pass

        if job_status.status.succeeded:
            context.log.info("'%s' - Job completed." % api_response.metadata.name)
            v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace="dagster")
            break
        elif job_status.status.failed:
            context.log.error("'%s' - Job failed." % api_response.metadata.name)
            v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace="dagster")
            raise DagsterError("Job failed")
        time.sleep(5)

@RMHogervorst
Copy link
Author

@RMHogervorst version of op and job (note that INS is not supernice, better to create config inside the op and keep surface cleaner)

import os
import time
from typing import List

from dagster import In, op, DagsterError
from kubernetes import client, config


@op(
    ins = {
        'jobname':In(str,description="becomes names of container and first part of jobname"),
        "image_name": In(str, description="For instance …"),
        "cmdlineargs": In(List[str], description="The commands that need to run f.i. ['python runthis.py'] "),
        "max_cpu": In(str,default_value="750m"),
        "max_mem": In(str,default_value="256Mi"),
        "env": In(default_value=None, description="use client.V1EnvVar(name='',value=''"),
        "env_from": In(default_value=None, description= "use client.V1envFromSource(config_map_ref, secret_ref)")
        }
)
def create_k8s_job(context, jobname,image_name,cmdlineargs, max_cpu,max_mem, env, env_from):
    """Create and run a kubernetes job.

    For this operator to work we need: A name, a link to the image location and commands that 
    run the container. This operator will create the job on the cluster and
    stream the logs to dagster logging.
    You are not able to add volume mounts to this job.

    For more info about jobs see [kubernetes job docs](https://kubernetes.io/docs/concepts/workloads/controllers/job/)
    For more (sparse) info about the python library see [the library docs](https://github.com/kubernetes-client/python/blob/master/kubernetes/README.md)
    """
    if os.getenv('KUBERNETES_SERVICE_HOST'):
        context.log.debug("execution happening in kubernetes, so using `load_incluster_config`")
        config.load_incluster_config()
    else:
        context.log.debug("execution happening outside cluster, using `load_kube_config()`")
        config.load_kube_config()

    # Since the bucket and cluster namespace are the same in our case
    # we can use the bucketname as namespace variable too
    namespace = os.getenv('AWS_BUCKET','no-bucket')
    v1_batch = client.BatchV1Api()
    v1_core = client.CoreV1Api()
    # create jobdefinition
    job = client.V1Job(
        api_version="batch/v1",
        kind="Job",
        metadata=client.V1ObjectMeta(generate_name=f"{jobname}-", namespace=namespace),
        spec=client.V1JobSpec(
            # time to live (ttl) is 24 hours, but we delete it earlier when everything is alright
            ttl_seconds_after_finished=86400, 
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(
                    annotations={"job.generated-by":"create_k8s_job"},
                    labels={
                    "app": jobname,
                    "app.kubernetes.io/managed-by":"dagster"
                    }),
                spec=client.V1PodSpec(
                    containers=[
                        client.V1Container(
                            name=jobname,
                            image=image_name,
                            command=["/bin/sh", "-c"],
                            args=cmdlineargs,
                            resources=client.V1ResourceRequirements(
                                limits={"memory":max_mem, "cpu": max_cpu}, 
                                requests={"memory": "256Mi","cpu": "50m"}
                            ),
                            env=env,
                            env_from=env_from
                        )
                    ],
                    
                    restart_policy="Never",
                    service_account="default-editor"
                ),
            )
        )
    )
    # submit job to cluster
    api_response = v1_batch.create_namespaced_job(namespace, job)
    context.log.info("'%s' - k8sJob created." % str(api_response.metadata.name))
    # TODO: kill the job when not ready
    # poll and stream logs
    while True:
        job_status = v1_batch.read_namespaced_job_status(name=api_response.metadata.name, namespace=namespace)
        pod_list = v1_core.list_namespaced_pod(namespace=namespace, label_selector=f"app={jobname}").items
        pod_list = [pod for pod in pod_list if pod.metadata.name.startswith(api_response.metadata.name)]
        context.log.debug(f'There are {len(pod_list)} pods running of type "app={jobname}"')
        for pod in pod_list:
            context.log.debug(f'After filtering There are {len(pod_list)} pods running of type "app={jobname}"')
            # Forward available logs to dagster
            try:
                for line in v1_core.read_namespaced_pod_log(name=pod.metadata.name, namespace=namespace, since_seconds=5, container = jobname).splitlines():
                    context.log.info(pod.metadata.name + " - " + line)
                if api_response.status.conditions:
                    context.log.debug(pod.metadata.name + " - " + api_response.status.conditions)
            except:
                pass

        if job_status.status.succeeded:
            context.log.info("'%s' - k8sJob completed." % api_response.metadata.name)
            v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace=namespace)
            break
        elif job_status.status.failed:
            context.log.error("'%s' - k8sJob failed." % api_response.metadata.name)
            v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace=namespace)
            raise DagsterError("k8sJob failed")
        time.sleep(5) 
from dagster import job
from ops.kubernetes import create_k8s_job

@job(config={'ops':{"create_k8s_job":{"inputs":{"jobname":aname, "image_name": server/group/images/image:version, "cmdlineargs": "[sleep 3600 & echo 'job done' ]"}}}})
def run_k8s_job():
    create_k8s_job() 

@simonvanderveldt
Copy link
Contributor

Did you also consider running Pods instead of Jobs? (I couldn't find an issue for a Pod step so figured I'd ask here).

@RMHogervorst
Copy link
Author

I did not, because I use dagster to run batch jobs. That is a job definition which includes a pod. I'm not even sure you can run a pod alone, I thought you always had to wrap it into a deployment, job or stateful set. And I don't think I want dagster to start and stop deployments. But my usecase is not your usecase, how would you want to use a pod?

@gibsondan gibsondan self-assigned this Jun 2, 2022
@gibsondan
Copy link
Member

Starting to take a look at what an 'official' Dagster op for this could look like (heavily inspired by the examples here, thank you!) in https://github.com/dagster-io/dagster/pull/8161/files

@kervel
Copy link

kervel commented Jun 2, 2022

hello, the example above streams the pod logs to dagster, allowing one to follow the logs in real-time. i guess its racy since it uses since_seconds. i guess the proper way to stream pods is to pass follow=true to read_namespaced_pod_logs.

i think there would be an advantage over only reading the logs on pod completion like in the PR

@gibsondan gibsondan changed the title k8s-job-step requirements and expected interface Op that runs a job in Kubernetes Jun 2, 2022
@gibsondan
Copy link
Member

Good call - making it streaming makes sense, I'll add that (but I'd like it to stream to the Dagster stdout/stderr compute logs rather than context.log by default, since currently context.log writes to the event log DB and for some pods that could be pretty high-volume)

@AndreaGiardini
Copy link
Contributor

@gibsondan Should we close this issue now that #8161 has been merged?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants