diff --git a/CHANGELOG.md b/CHANGELOG.md index 187e8f65f20d..78b5c65a96f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/ ### Features -- None +- Added Local, Kubernetes, and Nomad agents - [#1341](https://github.com/PrefectHQ/prefect/pull/1341) ### Enhancements diff --git a/setup.cfg b/setup.cfg index be7ec546f90f..85bbf964079c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -51,6 +51,8 @@ omit= **__init__.py *_version.py *_siginfo.py + # temporary agent omit + */agent/* parallel = True [coverage:report] diff --git a/src/prefect/__init__.py b/src/prefect/__init__.py index c630bd8ee26f..40dc339e42d1 100644 --- a/src/prefect/__init__.py +++ b/src/prefect/__init__.py @@ -16,6 +16,8 @@ import prefect.serialization +import prefect.agent + from ._version import get_versions __version__ = get_versions()["version"] # type: ignore diff --git a/src/prefect/agent/__init__.py b/src/prefect/agent/__init__.py new file mode 100644 index 000000000000..043b2829f164 --- /dev/null +++ b/src/prefect/agent/__init__.py @@ -0,0 +1,7 @@ +# only agents that don't require `extras` should be automatically imported here; +# others must be explicitly imported so they can raise helpful errors if appropriate + +from prefect.agent.agent import Agent +import prefect.agent.local +import prefect.agent.kubernetes +import prefect.agent.nomad diff --git a/src/prefect/agent/agent.py b/src/prefect/agent/agent.py new file mode 100644 index 000000000000..41fff40ab6f9 --- /dev/null +++ b/src/prefect/agent/agent.py @@ -0,0 +1,199 @@ +import logging +import os +import pendulum +import time + +from prefect import config +from prefect.client import Client +from prefect.serialization import state +from prefect.engine.state import Submitted +from prefect.utilities.graphql import with_args + + +class Agent: + """ + Base class for Agents. + + This Agent class is a standard point for executing Flows in Prefect Cloud. It is meant + to have subclasses which inherit functionality from this class. The only piece that + the subclasses should implement is the `deploy_flows` function, which specifies how to run a Flow on the given platform. It is built in this + way to keep Prefect Cloud logic standard but allows for platform specific + customizability. + + In order for this to operate `PREFECT__CLOUD__AGENT__AUTH_TOKEN` must be set as an + environment variable or in your user configuration file. + """ + + def __init__(self) -> None: + # self.auth_token = config.cloud.agent.get("auth_token") + self.loop_interval = config.cloud.agent.get("loop_interval") + + self.client = Client(token=config.cloud.agent.get("auth_token")) + + logger = logging.getLogger("agent") + logger.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + ch.setFormatter(formatter) + logger.addHandler(ch) + + self.logger = logger + + def start(self) -> None: + """ + The main entrypoint to the agent. This function loops and constantly polls for + new flow runs to deploy + """ + self.logger.info("Starting {}".format(type(self).__name__)) + tenant_id = self.query_tenant_id() + + while True: + try: + flow_runs = self.query_flow_runs(tenant_id=tenant_id) + self.logger.info( + "Found {} flow run(s) to submit for execution.".format( + len(flow_runs) + ) + ) + + self.update_states(flow_runs) + self.deploy_flows(flow_runs) + self.logger.info( + "Submitted {} flow run(s) for execution.".format(len(flow_runs)) + ) + except Exception as exc: + self.logger.error(exc) + time.sleep(self.loop_interval) + + def query_tenant_id(self) -> str: + """ + Query Prefect Cloud for the tenant id that corresponds to the agent's auth token + + Returns: + - str: The current tenant id + """ + query = {"query": {"tenant": {"id"}}} + result = self.client.graphql(query) + return result.data.tenant[0].id # type: ignore + + def query_flow_runs(self, tenant_id: str) -> list: + """ + Query Prefect Cloud for flow runs which need to be deployed and executed + + Args: + - tenant_id (str): The tenant id to use in the query + + Returns: + - list: A list of GraphQLResult flow run objects + """ + + # Get scheduled flow runs from queue + mutation = { + "mutation($input: getRunsInQueueInput!)": { + "getRunsInQueue(input: $input)": {"flow_run_ids"} + } + } + + result = self.client.graphql( + mutation, variables={"input": {"tenantId": tenant_id}} + ) + flow_run_ids = result.data.getRunsInQueue.flow_run_ids # type: ignore + now = pendulum.now("UTC") + + # Query metadata fow flow runs found in queue + query = { + "query": { + with_args( + "flow_run", + { + # match flow runs in the flow_run_ids list + "where": { + "id": {"_in": flow_run_ids}, + "_or": [ + # who are EITHER scheduled... + {"state": {"_eq": "Scheduled"}}, + # OR running with task runs scheduled to start more than 3 seconds ago + { + "state": {"_eq": "Running"}, + "task_runs": { + "state_start_time": { + "_lte": str(now.subtract(seconds=3)) + } + }, + }, + ], + } + }, + ): { + "id": True, + "version": True, + "tenant_id": True, + "state": True, + "serialized_state": True, + "parameters": True, + "flow": {"id", "name", "environment", "storage"}, + with_args( + "task_runs", + { + "where": { + "state_start_time": { + "_lte": str(now.subtract(seconds=3)) + } + } + }, + ): {"id", "version", "task_id", "serialized_state"}, + } + } + } + + result = self.client.graphql(query) + return result.data.flow_run # type: ignore + + def update_states(self, flow_runs: list) -> None: + """ + After a flow run is grabbed this function sets the state to Submitted so it + won't be picked up by any other processes + + Args: + - flow_runs (list): A list of GraphQLResult flow run objects + """ + for flow_run in flow_runs: + + # Set flow run state to `Submitted` if it is currently `Scheduled` + if state.StateSchema().load(flow_run.serialized_state).is_scheduled(): + self.client.set_flow_run_state( + flow_run_id=flow_run.id, + version=flow_run.version, + state=Submitted( + message="Submitted for execution", + state=state.StateSchema().load(flow_run.serialized_state), + ), + ) + + # Set task run states to `Submitted` if they are currently `Scheduled` + for task_run in flow_run.task_runs: + if state.StateSchema().load(task_run.serialized_state).is_scheduled(): + self.client.set_task_run_state( + task_run_id=task_run.id, + version=task_run.version, + state=Submitted( + message="Submitted for execution", + state=state.StateSchema().load(task_run.serialized_state), + ), + ) + + def deploy_flows(self, flow_runs: list) -> None: + """ + Meant to be overridden by a platform specific deployment option + + Args: + - flow_runs (list): A list of GraphQLResult flow run objects + """ + pass + + +if __name__ == "__main__": + Agent().start() diff --git a/src/prefect/agent/kubernetes/README.md b/src/prefect/agent/kubernetes/README.md new file mode 100644 index 000000000000..5dce269b237d --- /dev/null +++ b/src/prefect/agent/kubernetes/README.md @@ -0,0 +1,13 @@ +# k8s-agent + +The Prefect Kubernetes agent that turns a cluster into a workflow execution platform, orchestrated by Prefect Cloud. + +If running on GKE you may need to execute: `kubectl create clusterrolebinding default-admin --clusterrole cluster-admin --serviceaccount=default:default` + +The agent needs to be able to read, list, and create both pods and jobs. The resource manager aspect needs the same permissions with the added role of being able to delete jobs and pods. + +Quick Start: + +- Build Dockerfile and push to registry +- Update the `image` and `PREFECT__CLOUD__AUTH_TOKEN` env value in the deployment yaml +- Run `kubectl apply -f deployment.yaml` diff --git a/src/prefect/agent/kubernetes/__init__.py b/src/prefect/agent/kubernetes/__init__.py new file mode 100644 index 000000000000..d4877b5ac645 --- /dev/null +++ b/src/prefect/agent/kubernetes/__init__.py @@ -0,0 +1,2 @@ +from prefect.agent.kubernetes.agent import KubernetesAgent +from prefect.agent.kubernetes.resource_manager import ResourceManager diff --git a/src/prefect/agent/kubernetes/agent.py b/src/prefect/agent/kubernetes/agent.py new file mode 100644 index 000000000000..11a57cfb1058 --- /dev/null +++ b/src/prefect/agent/kubernetes/agent.py @@ -0,0 +1,99 @@ +import os +from os import path +import uuid + +import yaml + +from prefect.agent import Agent +from prefect.environments.storage import Docker +from prefect.serialization.storage import StorageSchema +from prefect.utilities.graphql import GraphQLResult + + +class KubernetesAgent(Agent): + """ + Agent which deploys flow runs as Kubernetes jobs. Currently this is required to either + run on a k8s cluster or on a local machine where the kube_config is pointing at the + desired cluster. + """ + + def __init__(self) -> None: + super().__init__() + + from kubernetes import client, config + + try: + config.load_incluster_config() + except config.config_exception.ConfigException as exc: + self.logger.warning( + "{} Using out of cluster configuration option.".format(exc) + ) + config.load_kube_config() + + self.batch_client = client.BatchV1Api() + + def deploy_flows(self, flow_runs: list) -> None: + """ + Deploy flow runs on to a k8s cluster as jobs + + Args: + - flow_runs (list): A list of GraphQLResult flow run objects + """ + for flow_run in flow_runs: + + # Require Docker storage + if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker): + self.logger.error( + "Storage for flow run {} is not of type Docker.".format(flow_run.id) + ) + continue + + job_spec = self.replace_job_spec_yaml(flow_run) + + self.batch_client.create_namespaced_job( + namespace=os.getenv("NAMESPACE", "default"), body=job_spec + ) + + def replace_job_spec_yaml(self, flow_run: GraphQLResult) -> dict: + """ + Populate metadata and variables in the job_spec.yaml file for flow runs + + Args: + - flow_run (GraphQLResult): A flow run object + + Returns: + - dict: a dictionary representing the populated yaml object + """ + with open(path.join(path.dirname(__file__), "job_spec.yaml"), "r") as job_file: + job = yaml.safe_load(job_file) + + identifier = str(uuid.uuid4())[:8] + job_name = "prefect-job-{}".format(identifier) + + # Populate job metadata for identification + job["metadata"]["name"] = job_name + job["metadata"]["labels"]["app"] = job_name + job["metadata"]["labels"]["identifier"] = identifier + job["metadata"]["labels"]["flow_run_id"] = flow_run.id # type: ignore + job["metadata"]["labels"]["flow_id"] = flow_run.flow.id # type: ignore + job["spec"]["template"]["metadata"]["labels"]["app"] = job_name + job["spec"]["template"]["metadata"]["labels"]["identifier"] = identifier + + # Use flow storage image for job + job["spec"]["template"]["spec"]["containers"][0]["image"] = ( + StorageSchema().load(flow_run.flow.storage).name # type: ignore + ) + + # Populate environment variables for flow run execution + env = job["spec"]["template"]["spec"]["containers"][0]["env"] + + env[0]["value"] = os.getenv("PREFECT__CLOUD__API", "https://api.prefect.io") + env[1]["value"] = os.environ["PREFECT__CLOUD__AGENT__AUTH_TOKEN"] + env[2]["value"] = flow_run.id # type: ignore + env[3]["value"] = os.getenv("NAMESPACE", "default") + + return job + + +if __name__ == "__main__": + KubernetesAgent().start() diff --git a/src/prefect/agent/kubernetes/deployment.yaml b/src/prefect/agent/kubernetes/deployment.yaml new file mode 100644 index 000000000000..ccf0f12e489a --- /dev/null +++ b/src/prefect/agent/kubernetes/deployment.yaml @@ -0,0 +1,55 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: prefect-agent + labels: + app: prefect-agent +spec: + replicas: 1 + selector: + matchLabels: + app: prefect-agent + template: + metadata: + labels: + app: prefect-agent + spec: + containers: + - name: agent + image: prefecthq/prefect:latest + imagePullPolicy: IfNotPresent + command: ["/bin/bash", "-c"] + args: ["prefect agent start kubernetes-agent"] # COMMAND TBD + env: + - name: PREFECT__CLOUD__AGENT__AUTH_TOKEN + value: "" + - name: PREFECT__CLOUD__API + value: "https://api.prefect.io" + - name: PREFECT__CLOUD__AGENT__LOOP_INTERVAL + value: "5" + - name: NAMESPACE + value: "default" + resources: + limits: + memory: "128Mi" + cpu: "100m" + - name: resource-manager + image: prefecthq/prefect:latest + imagePullPolicy: IfNotPresent + command: ["/bin/bash", "-c"] + args: ["prefect agent start kubernetes-resource-manager"] # COMMAND TBD + env: + - name: PREFECT__CLOUD__AGENT__AUTH_TOKEN + value: "" + - name: PREFECT__CLOUD__API + value: "https://api.prefect.io" + - name: PREFECT__CLOUD__AGENT__RESOURCE_MANAGER__LOOP_INTERVAL + value: "60" + - name: NAMESPACE + value: "default" + resources: + limits: + memory: "128Mi" + cpu: "100m" + imagePullSecrets: + - name: "" diff --git a/src/prefect/agent/kubernetes/job_spec.yaml b/src/prefect/agent/kubernetes/job_spec.yaml new file mode 100644 index 000000000000..b3b0cc1d0aa6 --- /dev/null +++ b/src/prefect/agent/kubernetes/job_spec.yaml @@ -0,0 +1,45 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: prefect-job-UUID + labels: + app: prefect-job-UUID + identifier: UUID +spec: + template: + metadata: + labels: + app: prefect-job-UUID + identifier: UUID + spec: + containers: + - name: flow + image: prefecthq/prefect:latest + imagePullPolicy: Always + command: ["/bin/sh", "-c"] + args: ["prefect execute cloud-flow"] + env: + - name: PREFECT__CLOUD__API + value: PREFECT__CLOUD__API + - name: PREFECT__CLOUD__AUTH_TOKEN + value: PREFECT__CLOUD__AUTH_TOKEN + - name: PREFECT__CONTEXT__FLOW_RUN_ID + value: PREFECT__CONTEXT__FLOW_RUN_ID + - name: PREFECT__CONTEXT__NAMESPACE + value: PREFECT__CONTEXT__NAMESPACE + - name: PREFECT__CLOUD__USE_LOCAL_SECRETS + value: "false" + - name: PREFECT__LOGGING__LOG_TO_CLOUD + value: "true" + - name: PREFECT__LOGGING__LEVEL + value: "DEBUG" + - name: PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS + value: "prefect.engine.cloud.CloudFlowRunner" + - name: PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS + value: "prefect.engine.cloud.CloudTaskRunner" + resources: + requests: + cpu: "100m" + limits: + cpu: "100m" + restartPolicy: Never diff --git a/src/prefect/agent/kubernetes/resource_manager.py b/src/prefect/agent/kubernetes/resource_manager.py new file mode 100644 index 000000000000..9b383692a710 --- /dev/null +++ b/src/prefect/agent/kubernetes/resource_manager.py @@ -0,0 +1,323 @@ +import logging +import os +import time + +import pendulum + +from prefect import Client +from prefect import config + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import kubernetes + + +class ResourceManager: + """ + The resource manager is responsible for cleaning up old completed/failed k8s jobs + and pods from the cluster. This is optional and does not need to me used for the agent + to work. + """ + + def __init__(self) -> None: + self.loop_interval = config.cloud.agent.resource_manager.get("loop_interval") + self.client = Client(token=config.cloud.agent.get("auth_token")) + self.namespace = os.getenv("NAMESPACE", "default") + + logger = logging.getLogger("resource-manager") + logger.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + ch.setFormatter(formatter) + logger.addHandler(ch) + + self.logger = logger + + from kubernetes import client, config + + try: + config.load_incluster_config() + except config.config_exception.ConfigException as exc: + self.logger.warning( + "{} Using out of cluster configuration option.".format(exc) + ) + config.load_kube_config() + + self.k8s_client = client + + def start(self) -> None: + """ + Main loop which waits on a `LOOP_INTERVAL` and looks for finished jobs to clean + """ + self.logger.info("Starting {}".format(type(self).__name__)) + while True: + try: + self.clean_resources() + except Exception as exc: + self.logger.error(exc) + time.sleep(self.loop_interval) + + # IDENTIFICATION + + def clean_resources(self) -> None: + """ + Find jobs that are either completed or failed to delete from the cluster + """ + batch_client = self.k8s_client.BatchV1Api() + + try: + jobs = batch_client.list_namespaced_job(namespace=self.namespace) + except self.k8s_client.rest.ApiException: + self.logger.error( + "Error attempting to list jobs in namespace {}".format(self.namespace) + ) + return + + for job in jobs.items: + if job.status.succeeded or job.status.failed: + + identifier = job.metadata.labels.get("identifier") + name = job.metadata.name + + if job.status.failed: + self.logger.info( + "Found failed job {} in namespace {}".format( + name, self.namespace + ) + ) + self.report_failed_job(identifier=identifier) + + self.delete_job(name=name) + self.delete_pods(job_name=name, identifier=identifier) + + if not jobs.items: + self.clean_extra_pods() + + def clean_extra_pods(self) -> None: + """ + Any runaway pods which failed due to unexpected reasons will be cleaned up here. + ImagePullBackoffs, Evictions, etc... + """ + core_client = self.k8s_client.CoreV1Api() + + try: + pods = core_client.list_namespaced_pod(namespace=self.namespace) + except self.k8s_client.rest.ApiException: + self.logger.error( + "Error attempting to list pods in namespace {}".format(self.namespace) + ) + return + + for pod in pods.items: + phase = pod.status.phase + if phase != "Running": + + name = pod.metadata.name + + if phase == "Failed": + self.report_failed_pod(pod=pod) + + if phase == "Unknown": + self.report_unknown_pod(pod=pod) + + if phase == "Pending": + if pod.status.container_statuses: + self.report_pod_image_pull_error(pod=pod) + + self.delete_extra_pod(name=name) + + # DELETION + + def delete_job(self, name: str) -> None: + """ + Delete a job based on the name + """ + batch_client = self.k8s_client.BatchV1Api() + self.logger.info("Deleting job {} in namespace {}".format(name, self.namespace)) + + try: + batch_client.delete_namespaced_job( + name=name, + namespace=self.namespace, + body=self.k8s_client.V1DeleteOptions(), + ) + except self.k8s_client.rest.ApiException: + self.logger.error( + "Error attempting to delete job {} in namespace {}".format( + name, self.namespace + ) + ) + + def delete_pods(self, job_name: str, identifier: str) -> None: + """ + Delete a pod based on the job name and identifier + """ + core_client = self.k8s_client.CoreV1Api() + try: + pods = core_client.list_namespaced_pod( + namespace=self.namespace, + label_selector="identifier={}".format(identifier), + ) + except self.k8s_client.rest.ApiException: + self.logger.error( + "Error attempting to list pods in namespace {}".format(self.namespace) + ) + return + + if pods: + self.logger.info( + "Deleting {} pods for job {} in namespace {}".format( + len(pods.items), job_name, self.namespace + ) + ) + for pod in pods.items: + name = pod.metadata.name + + try: + core_client.delete_namespaced_pod( + name=name, + namespace=self.namespace, + body=self.k8s_client.V1DeleteOptions(), + ) + except self.k8s_client.rest.ApiException: + self.logger.error( + "Error attempting to delete pod {} in namespace {}".format( + name, self.namespace + ) + ) + + def delete_extra_pod(self, name: str) -> None: + """ + Delete a pod based on the name + """ + core_client = self.k8s_client.CoreV1Api() + self.logger.info( + "Deleting extra pod {} in namespace {}".format(name, self.namespace) + ) + + try: + core_client.delete_namespaced_pod( + name=name, + namespace=self.namespace, + body=self.k8s_client.V1DeleteOptions(), + ) + except self.k8s_client.rest.ApiException: + self.logger.error( + "Error attempting to delete pod {} in namespace {}".format( + name, self.namespace + ) + ) + + # REPORTING + + def report_failed_job(self, identifier: str) -> None: + """ + Report jobs that failed for reasons outside of a flow run + """ + core_client = self.k8s_client.CoreV1Api() + try: + pods = core_client.list_namespaced_pod( + namespace=self.namespace, + label_selector="identifier={}".format(identifier), + ) + except self.k8s_client.rest.ApiException: + self.logger.error( + "Error attempting to list pods in namespace {}".format(self.namespace) + ) + return + + for pod in pods.items: + phase = pod.status.phase + if phase == "Failed": + self.report_failed_pod(pod) + + def report_failed_pod(self, pod: "kubernetes.client.V1Pod") -> None: + """ + Report pods that failed for reasons outside of a flow run. Write cloud log + """ + core_client = self.k8s_client.CoreV1Api() + name = pod.metadata.name + + if pod.status.reason == "Evicted": + logs = "Pod was evicted due to cluster resource constraints / auto scaling." + else: + try: + logs = core_client.read_namespaced_pod_log( + namespace=self.namespace, name=name + ) + except self.k8s_client.rest.ApiException: + self.logger.error( + "Error attempting to read pod logs for {} in namespace {}".format( + name, self.namespace + ) + ) + return + + self.logger.info( + "Reporting failed pod {} in namespace {}".format(name, self.namespace) + ) + + self.client.write_run_log( + flow_run_id=pod.metadata.labels.get("flow_run_id"), + task_run_id="", + timestamp=pendulum.now(), + name="resource-manager", + message=logs, + level="ERROR", + info={}, + ) + + def report_unknown_pod(self, pod: "kubernetes.client.V1Pod") -> None: + """ + Write cloud log of pods that entered unknonw states + """ + name = pod.metadata.name + self.logger.info( + "Reporting unknown pod {} in namespace {}".format(name, self.namespace) + ) + + self.client.write_run_log( + flow_run_id=pod.metadata.labels.get("flow_run_id"), + task_run_id="", + timestamp=pendulum.now(), + name="resource-manager", + message="Flow run pod {} entered an unknown state in namespace {}".format( + name, self.namespace + ), + level="ERROR", + info={}, + ) + + def report_pod_image_pull_error(self, pod: "kubernetes.client.V1Pod") -> None: + """ + Write cloud log of pods that ahd image pull errors + """ + for status in pod.status.container_statuses: + waiting = status.state.waiting + + if waiting and waiting.reason == "ImagePullBackoff": + self.logger.info( + "Reporting image pull error for pod {} in namespace {}".format( + pod.metadata.name, self.namespace + ) + ) + + self.client.write_run_log( + flow_run_id=pod.metadata.labels.get("flow_run_id"), + task_run_id="", + timestamp=pendulum.now(), + name="resource-manager", + message="Flow run image pull error for pod {} in namespace {}".format( + pod.metadata.name, self.namespace + ), + level="ERROR", + info={}, + ) + + +if __name__ == "__main__": + ResourceManager().start() diff --git a/src/prefect/agent/local/__init__.py b/src/prefect/agent/local/__init__.py new file mode 100644 index 000000000000..5af9f495b2b4 --- /dev/null +++ b/src/prefect/agent/local/__init__.py @@ -0,0 +1 @@ +from prefect.agent.local.agent import LocalAgent diff --git a/src/prefect/agent/local/agent.py b/src/prefect/agent/local/agent.py new file mode 100644 index 000000000000..752eb8ea4f14 --- /dev/null +++ b/src/prefect/agent/local/agent.py @@ -0,0 +1,87 @@ +import docker + +from prefect import config +from prefect.agent import Agent +from prefect.environments.storage import Docker +from prefect.serialization.storage import StorageSchema +from prefect.utilities.graphql import GraphQLResult + + +class LocalAgent(Agent): + """ + Agent which deploys flow runs locally as Docker containers. + + Args: + - base_url (str, optional): URL for a Docker daemon server. Defaults to + `unix:///var/run/docker.sock` however other hosts such as + `tcp://0.0.0.0:2375` can be provided + """ + + def __init__(self, base_url: str = None) -> None: + super().__init__() + + base_url = base_url or "unix://var/run/docker.sock" + self.docker_client = docker.APIClient(base_url=base_url, version="auto") + + # Ping Docker daemon for connection issues + try: + self.docker_client.ping() + except Exception as exc: + self.logger.error( + "Issue connecting to the Docker daemon. Make sure it is running." + ) + raise exc + + def deploy_flows(self, flow_runs: list) -> None: + """ + Deploy flow runs on your local machine as Docker containers + + Args: + - flow_runs (list): A list of GraphQLResult flow run objects + """ + for flow_run in flow_runs: + + storage = StorageSchema().load(flow_run.flow.storage) + if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker): + self.logger.error( + "Storage for flow run {} is not of type Docker.".format(flow_run.id) + ) + continue + + env_vars = self.populate_env_vars(flow_run=flow_run) + + # Pull image if it doesn't exist locally + self.docker_client.pull(storage.name) + + # Create a container + container = self.docker_client.create_container( + storage.name, command="prefect execute cloud-flow", environment=env_vars + ) + + # Start the container + self.docker_client.start(container=container.get("Id")) + + def populate_env_vars(self, flow_run: GraphQLResult) -> dict: + """ + Populate metadata and variables in the environment variables for a flow run + + Args: + - flow_run (GraphQLResult): A flow run object + + Returns: + - dict: a dictionary representing the populated environment variables + """ + return { + "PREFECT__CLOUD__API": config.cloud.api, + "PREFECT__CLOUD__AUTH_TOKEN": config.cloud.agent.auth_token, + "PREFECT__CONTEXT__FLOW_RUN_ID": flow_run.id, # type: ignore + "PREFECT__CLOUD__USE_LOCAL_SECRETS": "false", + "PREFECT__LOGGING__LOG_TO_CLOUD": "true", + "PREFECT__LOGGING__LEVEL": "DEBUG", + "PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudFlowRunner", + "PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudTaskRunner", + } + + +if __name__ == "__main__": + LocalAgent().start() diff --git a/src/prefect/agent/nomad/README.md b/src/prefect/agent/nomad/README.md new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/prefect/agent/nomad/__init__.py b/src/prefect/agent/nomad/__init__.py new file mode 100644 index 000000000000..38e7b0fab39f --- /dev/null +++ b/src/prefect/agent/nomad/__init__.py @@ -0,0 +1 @@ +from prefect.agent.nomad.agent import NomadAgent diff --git a/src/prefect/agent/nomad/agent.py b/src/prefect/agent/nomad/agent.py new file mode 100644 index 000000000000..a2e1b787a851 --- /dev/null +++ b/src/prefect/agent/nomad/agent.py @@ -0,0 +1,81 @@ +import json +import os +from os import path +import uuid + +import requests + +from prefect.agent import Agent +from prefect.environments.storage import Docker +from prefect.serialization.storage import StorageSchema +from prefect.utilities.graphql import GraphQLResult + + +class NomadAgent(Agent): + """ + Agent which deploys flow runs as Nomad jobs to a Nomad cluster based on the + `NOMAD_HOST` environment variable. + """ + + def __init__(self) -> None: + super().__init__() + + def deploy_flows(self, flow_runs: list) -> None: + """ + Deploy flow runs on to a Nomad cluster as jobs + + Args: + - flow_runs (list): A list of GraphQLResult flow run objects + """ + for flow_run in flow_runs: + + if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker): + self.logger.error( + "Storage for flow run {} is not of type Docker.".format(flow_run.id) + ) + continue + + job_spec = self.replace_job_spec_json(flow_run) + nomad_host = os.getenv("NOMAD_HOST", "http://127.0.0.1:4646") + requests.post(path.join(nomad_host, "v1/jobs"), json=job_spec) + + def replace_job_spec_json(self, flow_run: GraphQLResult) -> dict: + """ + Populate metadata and variables in the job_spec.nomad file for flow runs + + Args: + - flow_run (GraphQLResult): A flow run objects + + Returns: + - dict: a dictionary representing the populated json object + """ + with open(path.join(path.dirname(__file__), "job_spec.nomad"), "r") as job_file: + job = json.load(job_file) + + job["Job"]["ID"] = flow_run.id # type: ignore + job["Job"]["Name"] = "prefect-job-{}".format(str(uuid.uuid4())[:8]) + + job["Job"]["TaskGroups"][0]["Name"] = "prefect-job-{}".format( + flow_run.id # type: ignore + ) + job["Job"]["TaskGroups"][0]["Tasks"][0]["Name"] = flow_run.id # type: ignore + + job["Job"]["TaskGroups"][0]["Tasks"][0]["Config"]["image"] = ( + StorageSchema().load(flow_run.flow.storage).name # type: ignore + ) + + env = job["Job"]["TaskGroups"][0]["Tasks"][0]["Env"] + env["PREFECT__CLOUD__API"] = os.getenv( + "PREFECT__CLOUD__API", "https://api.prefect.io" + ) + env["PREFECT__CLOUD__AGENT__AUTH_TOKEN"] = os.environ[ + "PREFECT__CLOUD__AGENT__AUTH_TOKEN" + ] + env["PREFECT__CONTEXT__FLOW_RUN_ID"] = flow_run.id # type: ignore + env["PREFECT__CONTEXT__NAMESPACE"] = os.getenv("NAMESPACE", "default") + + return job + + +if __name__ == "__main__": + NomadAgent().start() diff --git a/src/prefect/agent/nomad/deployment.nomad b/src/prefect/agent/nomad/deployment.nomad new file mode 100644 index 000000000000..8c53f324eaba --- /dev/null +++ b/src/prefect/agent/nomad/deployment.nomad @@ -0,0 +1,48 @@ +job "prefect-agent" { + # region = "global" + + datacenters = ["dc1"] +# datacenters = ["us-east-1"] + + type = "service" + + # eventually we might consider placing the resource manager in this group as well + group "prefect" { + count = 1 + + restart { + attempts = 5 + delay = "5s" + interval = "30s" + mode = "fail" + } + + task "agent" { + # The "driver" parameter specifies the task driver that should be used to + # run the task. + driver = "docker" + + # config for docker + config { + image = "IMAGE" + command = "/bin/bash" + args = ["-c", "prefect agent start nomad-agent"] + force_pull = true + } + + env { + NOMAD_HOST = "http://127.0.0.1:4646" + PREFECT__CLOUD__AGENT__AUTH_TOKEN = "TOKEN" + PREFECT__CLOUD__API = "https://api.prefect.io" + LOOP_INTERVAL = "5" + NAMESPACE = "default" + } + + resources { + cpu = 200 + memory = 128 + } + + } + } +} diff --git a/src/prefect/agent/nomad/job_spec.nomad b/src/prefect/agent/nomad/job_spec.nomad new file mode 100644 index 000000000000..ea1b1a83be8c --- /dev/null +++ b/src/prefect/agent/nomad/job_spec.nomad @@ -0,0 +1,40 @@ +{ + "Job": { + "ID": "prefect-job-UUID", + "Name": "prefect-job-UUID", + "Type": "batch", + "Datacenters": [ + "dc1" + ], + "TaskGroups": [{ + "Name": "flow", + "Count": 1, + "Tasks": [{ + "Name": "flow", + "Driver": "docker", + "Env": { + "PREFECT__CLOUD__API": "XX", + "PREFECT__CLOUD__AUTH_TOKEN": "XX", + "PREFECT__CONTEXT__FLOW_RUN_ID": "XX", + "PREFECT__CONTEXT__NAMESPACE": "XX", + "PREFECT__CLOUD__USE_LOCAL_SECRETS": "false", + "PREFECT__LOGGING__LOG_TO_CLOUD": "true", + "PREFECT__LOGGING__LEVEL": "DEBUG", + "PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudFlowRunner", + "PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudTaskRunner" + }, + "Config": { + "image": "prefecthq/prefect:latest", + "command": "/bin/bash", + "args": ["-c", "prefect execute cloud-flow"] + }, + "Resources": { + "CPU": 100 + } + }], + "RestartPolicy": { + "Attempts": 0 + } + }] + } +} diff --git a/src/prefect/cli/__init__.py b/src/prefect/cli/__init__.py index 620a5b90866b..a91b24bdd06c 100644 --- a/src/prefect/cli/__init__.py +++ b/src/prefect/cli/__init__.py @@ -5,6 +5,7 @@ import prefect +from .agent import agent as _agent from .auth import auth as _auth from .describe import describe as _describe from .execute import execute as _execute @@ -33,6 +34,7 @@ def cli(): Execution Commands: execute Execute a flow's environment run Run a flow + agent Manage agents \b Setup Commands: @@ -46,6 +48,7 @@ def cli(): pass +cli.add_command(_agent) cli.add_command(_auth) cli.add_command(_describe) cli.add_command(_execute) diff --git a/src/prefect/cli/agent.py b/src/prefect/cli/agent.py new file mode 100644 index 000000000000..6c8070ff80e8 --- /dev/null +++ b/src/prefect/cli/agent.py @@ -0,0 +1,64 @@ +import click + +from prefect import config +from prefect.utilities.configuration import set_temporary_config +from prefect.utilities.serialization import from_qualified_name + +_agents = { + "local": "prefect.agent.local.LocalAgent", + "kubernetes": "prefect.agent.kubernetes.KubernetesAgent", + "nomad": "prefect.agent.nomad.NomadAgent", +} + + +@click.group(hidden=True) +def agent(): + """ + Manage Prefect agents. + + \b + Usage: + $ prefect agent [COMMAND] + + \b + Arguments: + start Start a Prefect agent + + \b + Examples: + $ prefect agent start + + \b + $ prefect agent start kubernetes --token MY_TOKEN + """ + pass + + +@agent.command(hidden=True) +@click.argument("name", default="local") +@click.option( + "--token", "-t", required=False, help="A Prefect Cloud API token.", hidden=True +) +def start(name, token): + """ + Start an agent. + + \b + Arguments: + name TEXT The name of an agent to start (e.g. `local`, `kubernetes`, `nomad`) + Defaults to `local` + + \b + Options: + --token, -t TEXT A Prefect Cloud api token + """ + with set_temporary_config( + {"cloud.agent.auth_token": token or config.cloud.agent.auth_token} + ): + retrieved_agent = _agents.get(name, None) + + if not retrieved_agent: + click.secho("{} is not a valid agent".format(name), color="red") + return + + from_qualified_name(retrieved_agent)().start() diff --git a/src/prefect/cli/auth.py b/src/prefect/cli/auth.py index 936144a270a3..ffb292ab2c6c 100644 --- a/src/prefect/cli/auth.py +++ b/src/prefect/cli/auth.py @@ -20,9 +20,6 @@ def auth(): \b Examples: $ prefect auth login --token MY_TOKEN - - \b - $ prefect auth login --token MY_TOKEN """ pass diff --git a/src/prefect/client/client.py b/src/prefect/client/client.py index a024a4532a90..9118dd265ce7 100644 --- a/src/prefect/client/client.py +++ b/src/prefect/client/client.py @@ -62,15 +62,17 @@ class Client: Args: - graphql_server (str, optional): the URL to send all GraphQL requests to; if not provided, will be pulled from `cloud.graphql` config var + - token (str, optional): a Prefect Cloud auth token for communication; if not + provided, will be pulled from `cloud.auth_token` config var """ - def __init__(self, graphql_server: str = None): + def __init__(self, graphql_server: str = None, token: str = None): if not graphql_server: graphql_server = prefect.config.cloud.get("graphql") self.graphql_server = graphql_server - token = prefect.config.cloud.get("auth_token", None) + token = token or prefect.config.cloud.get("auth_token", None) self.token_is_local = False if token is None: diff --git a/src/prefect/config.toml b/src/prefect/config.toml index e623f36c2a98..61009a9f87a6 100644 --- a/src/prefect/config.toml +++ b/src/prefect/config.toml @@ -9,6 +9,14 @@ graphql = "${cloud.api}/graphql/alpha" use_local_secrets = true heartbeat_interval = 30.0 + [cloud.agent] + # Agents require different API tokens + auth_token = "" + loop_interval = 5 + + [cloud.agent.resource_manager] + # Separate loop interval for resource managers + loop_interval = 60 [logging] # The logging level: NOTSET, DEBUG, INFO, WARNING, ERROR, or CRITICAL diff --git a/tests/cli/test_agent.py b/tests/cli/test_agent.py new file mode 100644 index 000000000000..826b4a4e8df9 --- /dev/null +++ b/tests/cli/test_agent.py @@ -0,0 +1,56 @@ +from unittest.mock import MagicMock + +from click.testing import CliRunner + +from prefect.cli.agent import agent + + +def test_agent_init(): + runner = CliRunner() + result = runner.invoke(agent) + assert result.exit_code == 0 + assert "Manage Prefect agents." in result.output + + +def test_agent_help(): + runner = CliRunner() + result = runner.invoke(agent, ["--help"]) + assert result.exit_code == 0 + assert "Manage Prefect agents." in result.output + + +def test_agent_start(monkeypatch): + start = MagicMock() + monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start) + + docker_client = MagicMock() + monkeypatch.setattr("prefect.agent.local.agent.docker.APIClient", docker_client) + + runner = CliRunner() + result = runner.invoke(agent, ["start"]) + assert result.exit_code == 0 + + +def test_agent_start_token(monkeypatch): + start = MagicMock() + monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start) + + docker_client = MagicMock() + monkeypatch.setattr("prefect.agent.local.agent.docker.APIClient", docker_client) + + runner = CliRunner() + result = runner.invoke(agent, ["start", "-t", "test"]) + assert result.exit_code == 0 + + +def test_agent_start_fails(monkeypatch): + start = MagicMock() + monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start) + + docker_client = MagicMock() + monkeypatch.setattr("prefect.agent.local.agent.docker.APIClient", docker_client) + + runner = CliRunner() + result = runner.invoke(agent, ["start", "TEST"]) + assert result.exit_code == 0 + assert "TEST is not a valid agent" in result.output